everscale_network/rldp/
node.rs1use std::sync::Arc;
2
3use anyhow::Result;
4use serde::{Deserialize, Serialize};
5use tokio::sync::Semaphore;
6
7use super::compression;
8use super::transfers_cache::*;
9use crate::adnl;
10use crate::proto;
11use crate::subscriber::*;
12use crate::util::*;
13
14#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
16#[serde(default)]
17pub struct NodeOptions {
18 pub max_answer_size: u32,
23
24 pub max_peer_queries: usize,
28
29 pub query_min_timeout_ms: u64,
33
34 pub query_max_timeout_ms: u64,
38
39 pub query_wave_len: u32,
43
44 pub query_wave_interval_ms: u64,
48
49 pub force_compression: bool,
53}
54
55impl Default for NodeOptions {
56 fn default() -> Self {
57 Self {
58 max_answer_size: 10 * 1024 * 1024,
59 max_peer_queries: 16,
60 query_min_timeout_ms: 500,
61 query_max_timeout_ms: 10000,
62 query_wave_len: 10,
63 query_wave_interval_ms: 10,
64 force_compression: false,
65 }
66 }
67}
68
69pub struct Node {
71 adnl: Arc<adnl::Node>,
73 semaphores: FastDashMap<adnl::NodeIdShort, Arc<Semaphore>>,
75 transfers: Arc<TransfersCache>,
77 options: NodeOptions,
79}
80
81impl Node {
82 pub fn new(
84 adnl: Arc<adnl::Node>,
85 subscribers: Vec<Arc<dyn QuerySubscriber>>,
86 options: NodeOptions,
87 ) -> Result<Arc<Self>> {
88 let transfers = Arc::new(TransfersCache::new(subscribers, options));
89
90 adnl.add_message_subscriber(transfers.clone())?;
91
92 Ok(Arc::new(Self {
93 adnl,
94 semaphores: Default::default(),
95 transfers,
96 options,
97 }))
98 }
99
100 #[inline(always)]
102 pub fn adnl(&self) -> &Arc<adnl::Node> {
103 &self.adnl
104 }
105
106 #[inline(always)]
107 pub fn options(&self) -> &NodeOptions {
108 &self.options
109 }
110
111 pub fn metrics(&self) -> NodeMetrics {
112 NodeMetrics {
113 peer_count: self.semaphores.len(),
114 transfers_cache_len: self.transfers.len(),
115 }
116 }
117
118 pub fn gc(&self) {
120 let max_permits = self.options.max_peer_queries;
121 self.semaphores
122 .retain(|_, semaphore| semaphore.available_permits() < max_permits);
123 }
124
125 #[tracing::instrument(level = "debug", name = "rldp_query", skip_all, fields(%local_id, %peer_id, ?roundtrip))]
126 pub async fn query(
127 &self,
128 local_id: &adnl::NodeIdShort,
129 peer_id: &adnl::NodeIdShort,
130 data: Vec<u8>,
131 roundtrip: Option<u64>,
132 ) -> Result<(Option<Vec<u8>>, u64)> {
133 let (query_id, query) = self.make_query(data);
134
135 let peer = self
136 .semaphores
137 .entry(*peer_id)
138 .or_insert_with(|| Arc::new(Semaphore::new(self.options.max_peer_queries)))
139 .value()
140 .clone();
141
142 let result = {
143 let _permit = peer.acquire().await.ok();
144 self.transfers
145 .query(&self.adnl, local_id, peer_id, query, roundtrip)
146 .await
147 };
148
149 match result? {
150 (Some(answer), roundtrip) => match tl_proto::deserialize(&answer) {
151 Ok(proto::rldp::Message::Answer {
152 query_id: answer_id,
153 data,
154 }) if answer_id == &query_id => Ok((
155 Some(compression::decompress(data).unwrap_or_else(|| data.to_vec())),
156 roundtrip,
157 )),
158 Ok(proto::rldp::Message::Answer { .. }) => Err(NodeError::QueryIdMismatch.into()),
159 Ok(proto::rldp::Message::Message { .. }) => {
160 Err(NodeError::UnexpectedAnswer("RldpMessageView::Message").into())
161 }
162 Ok(proto::rldp::Message::Query { .. }) => {
163 Err(NodeError::UnexpectedAnswer("RldpMessageView::Query").into())
164 }
165 Err(e) => Err(NodeError::InvalidPacketContent(e).into()),
166 },
167 (None, roundtrip) => Ok((None, roundtrip)),
168 }
169 }
170
171 fn make_query(&self, mut data: Vec<u8>) -> ([u8; 32], Vec<u8>) {
172 if self.options.force_compression {
173 if let Err(e) = compression::compress(&mut data) {
174 tracing::warn!("failed to compress RLDP query: {e:?}");
175 }
176 }
177
178 let query_id = gen_fast_bytes();
179 let data = proto::rldp::Message::Query {
180 query_id: &query_id,
181 max_answer_size: self.options.max_answer_size as u64,
182 timeout: now() + self.options.query_max_timeout_ms as u32 / 1000,
183 data: &data,
184 };
185 (query_id, tl_proto::serialize(data))
186 }
187}
188
189#[async_trait::async_trait]
190impl MessageSubscriber for TransfersCache {
191 async fn try_consume_custom<'a>(
192 &self,
193 ctx: SubscriberContext<'a>,
194 constructor: u32,
195 data: &'a [u8],
196 ) -> Result<bool> {
197 if constructor != proto::rldp::MessagePart::TL_ID_MESSAGE_PART
198 && constructor != proto::rldp::MessagePart::TL_ID_CONFIRM
199 && constructor != proto::rldp::MessagePart::TL_ID_COMPLETE
200 {
201 return Ok(false);
202 }
203
204 let message_part = tl_proto::deserialize(data)?;
205 self.handle_message(ctx.adnl, ctx.local_id, ctx.peer_id, message_part)
206 .await?;
207
208 Ok(true)
209 }
210}
211
212#[derive(Debug, Copy, Clone)]
214pub struct NodeMetrics {
215 pub peer_count: usize,
216 pub transfers_cache_len: usize,
217}
218
219#[derive(thiserror::Error, Debug)]
220enum NodeError {
221 #[error("Unexpected answer: {0}")]
222 UnexpectedAnswer(&'static str),
223 #[error("Invalid packet content: {0:?}")]
224 InvalidPacketContent(tl_proto::TlError),
225 #[error("Unknown query id")]
226 QueryIdMismatch,
227}