everscale_network/rldp/
node.rs

1use std::sync::Arc;
2
3use anyhow::Result;
4use serde::{Deserialize, Serialize};
5use tokio::sync::Semaphore;
6
7use super::compression;
8use super::transfers_cache::*;
9use crate::adnl;
10use crate::proto;
11use crate::subscriber::*;
12use crate::util::*;
13
14/// RLDP node configuration
15#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
16#[serde(default)]
17pub struct NodeOptions {
18    /// Max allowed RLDP answer size in bytes. Query will be rejected
19    /// if answer is bigger.
20    ///
21    /// Default: `10485760` (10 MB)
22    pub max_answer_size: u32,
23
24    /// Max parallel RLDP queries per peer.
25    ///
26    /// Default: `16`
27    pub max_peer_queries: usize,
28
29    /// Min RLDP query timeout.
30    ///
31    /// Default: `500` ms
32    pub query_min_timeout_ms: u64,
33
34    /// Max RLDP query timeout
35    ///
36    /// Default: `10000` ms
37    pub query_max_timeout_ms: u64,
38
39    /// Number of FEC messages to send in group. There will be a short delay between them.
40    ///
41    /// Default: `10`
42    pub query_wave_len: u32,
43
44    /// Interval between FEC broadcast waves.
45    ///
46    /// Default: `10` ms
47    pub query_wave_interval_ms: u64,
48
49    /// Whether requests will be compressed.
50    ///
51    /// Default: `false`
52    pub force_compression: bool,
53}
54
55impl Default for NodeOptions {
56    fn default() -> Self {
57        Self {
58            max_answer_size: 10 * 1024 * 1024,
59            max_peer_queries: 16,
60            query_min_timeout_ms: 500,
61            query_max_timeout_ms: 10000,
62            query_wave_len: 10,
63            query_wave_interval_ms: 10,
64            force_compression: false,
65        }
66    }
67}
68
69/// Reliable UDP transport layer
70pub struct Node {
71    /// Underlying ADNL node
72    adnl: Arc<adnl::Node>,
73    /// Parallel requests limiter
74    semaphores: FastDashMap<adnl::NodeIdShort, Arc<Semaphore>>,
75    /// Transfers handler
76    transfers: Arc<TransfersCache>,
77    /// Configuration
78    options: NodeOptions,
79}
80
81impl Node {
82    /// Create new RLDP node on top of the given ADNL node
83    pub fn new(
84        adnl: Arc<adnl::Node>,
85        subscribers: Vec<Arc<dyn QuerySubscriber>>,
86        options: NodeOptions,
87    ) -> Result<Arc<Self>> {
88        let transfers = Arc::new(TransfersCache::new(subscribers, options));
89
90        adnl.add_message_subscriber(transfers.clone())?;
91
92        Ok(Arc::new(Self {
93            adnl,
94            semaphores: Default::default(),
95            transfers,
96            options,
97        }))
98    }
99
100    /// Underlying ADNL node
101    #[inline(always)]
102    pub fn adnl(&self) -> &Arc<adnl::Node> {
103        &self.adnl
104    }
105
106    #[inline(always)]
107    pub fn options(&self) -> &NodeOptions {
108        &self.options
109    }
110
111    pub fn metrics(&self) -> NodeMetrics {
112        NodeMetrics {
113            peer_count: self.semaphores.len(),
114            transfers_cache_len: self.transfers.len(),
115        }
116    }
117
118    /// Clears semaphores table
119    pub fn gc(&self) {
120        let max_permits = self.options.max_peer_queries;
121        self.semaphores
122            .retain(|_, semaphore| semaphore.available_permits() < max_permits);
123    }
124
125    #[tracing::instrument(level = "debug", name = "rldp_query", skip_all, fields(%local_id, %peer_id, ?roundtrip))]
126    pub async fn query(
127        &self,
128        local_id: &adnl::NodeIdShort,
129        peer_id: &adnl::NodeIdShort,
130        data: Vec<u8>,
131        roundtrip: Option<u64>,
132    ) -> Result<(Option<Vec<u8>>, u64)> {
133        let (query_id, query) = self.make_query(data);
134
135        let peer = self
136            .semaphores
137            .entry(*peer_id)
138            .or_insert_with(|| Arc::new(Semaphore::new(self.options.max_peer_queries)))
139            .value()
140            .clone();
141
142        let result = {
143            let _permit = peer.acquire().await.ok();
144            self.transfers
145                .query(&self.adnl, local_id, peer_id, query, roundtrip)
146                .await
147        };
148
149        match result? {
150            (Some(answer), roundtrip) => match tl_proto::deserialize(&answer) {
151                Ok(proto::rldp::Message::Answer {
152                    query_id: answer_id,
153                    data,
154                }) if answer_id == &query_id => Ok((
155                    Some(compression::decompress(data).unwrap_or_else(|| data.to_vec())),
156                    roundtrip,
157                )),
158                Ok(proto::rldp::Message::Answer { .. }) => Err(NodeError::QueryIdMismatch.into()),
159                Ok(proto::rldp::Message::Message { .. }) => {
160                    Err(NodeError::UnexpectedAnswer("RldpMessageView::Message").into())
161                }
162                Ok(proto::rldp::Message::Query { .. }) => {
163                    Err(NodeError::UnexpectedAnswer("RldpMessageView::Query").into())
164                }
165                Err(e) => Err(NodeError::InvalidPacketContent(e).into()),
166            },
167            (None, roundtrip) => Ok((None, roundtrip)),
168        }
169    }
170
171    fn make_query(&self, mut data: Vec<u8>) -> ([u8; 32], Vec<u8>) {
172        if self.options.force_compression {
173            if let Err(e) = compression::compress(&mut data) {
174                tracing::warn!("failed to compress RLDP query: {e:?}");
175            }
176        }
177
178        let query_id = gen_fast_bytes();
179        let data = proto::rldp::Message::Query {
180            query_id: &query_id,
181            max_answer_size: self.options.max_answer_size as u64,
182            timeout: now() + self.options.query_max_timeout_ms as u32 / 1000,
183            data: &data,
184        };
185        (query_id, tl_proto::serialize(data))
186    }
187}
188
189#[async_trait::async_trait]
190impl MessageSubscriber for TransfersCache {
191    async fn try_consume_custom<'a>(
192        &self,
193        ctx: SubscriberContext<'a>,
194        constructor: u32,
195        data: &'a [u8],
196    ) -> Result<bool> {
197        if constructor != proto::rldp::MessagePart::TL_ID_MESSAGE_PART
198            && constructor != proto::rldp::MessagePart::TL_ID_CONFIRM
199            && constructor != proto::rldp::MessagePart::TL_ID_COMPLETE
200        {
201            return Ok(false);
202        }
203
204        let message_part = tl_proto::deserialize(data)?;
205        self.handle_message(ctx.adnl, ctx.local_id, ctx.peer_id, message_part)
206            .await?;
207
208        Ok(true)
209    }
210}
211
212/// Instant RLDP node metrics
213#[derive(Debug, Copy, Clone)]
214pub struct NodeMetrics {
215    pub peer_count: usize,
216    pub transfers_cache_len: usize,
217}
218
219#[derive(thiserror::Error, Debug)]
220enum NodeError {
221    #[error("Unexpected answer: {0}")]
222    UnexpectedAnswer(&'static str),
223    #[error("Invalid packet content: {0:?}")]
224    InvalidPacketContent(tl_proto::TlError),
225    #[error("Unknown query id")]
226    QueryIdMismatch,
227}