everscale_network/overlay/
node.rs

1use std::borrow::Cow;
2use std::sync::Arc;
3
4use anyhow::Result;
5use tl_proto::{BoxedConstructor, TlRead};
6
7use super::overlay::{Overlay, OverlayMetrics, OverlayOptions};
8use super::overlay_id::IdShort;
9use crate::adnl;
10use crate::proto;
11use crate::subscriber::*;
12use crate::util::*;
13
14/// P2P messages distribution layer group
15pub struct Node {
16    /// Underlying ADNL node
17    adnl: Arc<adnl::Node>,
18    /// Local ADNL key
19    node_key: Arc<adnl::Key>,
20    /// Shared state
21    state: Arc<NodeState>,
22}
23
24impl Node {
25    pub fn new(adnl: Arc<adnl::Node>, key_tag: usize) -> Result<Arc<Self>> {
26        let node_key = adnl.key_by_tag(key_tag)?.clone();
27        let state = Arc::new(NodeState::default());
28
29        adnl.add_query_subscriber(state.clone())?;
30        adnl.add_message_subscriber(state.clone())?;
31
32        Ok(Arc::new(Self {
33            adnl,
34            node_key,
35            state,
36        }))
37    }
38
39    /// Returns inner query subscriber
40    pub fn query_subscriber(&self) -> Arc<dyn QuerySubscriber> {
41        self.state.clone()
42    }
43
44    /// Returns metrics for all overlays
45    pub fn metrics(&self) -> impl Iterator<Item = (IdShort, OverlayMetrics)> + '_ {
46        self.state
47            .overlays
48            .iter()
49            .map(|item| (*item.id(), item.metrics()))
50    }
51
52    /// Underlying ADNL node
53    pub fn adnl(&self) -> &Arc<adnl::Node> {
54        &self.adnl
55    }
56
57    /// Adds overlay queries subscriber
58    pub fn add_overlay_subscriber(
59        &self,
60        overlay_id: IdShort,
61        subscriber: Arc<dyn QuerySubscriber>,
62    ) -> bool {
63        use dashmap::mapref::entry::Entry;
64
65        match self.state.subscribers.entry(overlay_id) {
66            Entry::Vacant(entry) => {
67                entry.insert(subscriber);
68                true
69            }
70            Entry::Occupied(_) => false,
71        }
72    }
73
74    /// Creates new public overlay
75    pub fn add_public_overlay(
76        &self,
77        overlay_id: &IdShort,
78        options: OverlayOptions,
79    ) -> (Arc<Overlay>, bool) {
80        use dashmap::mapref::entry::Entry;
81
82        match self.state.overlays.entry(*overlay_id) {
83            Entry::Vacant(entry) => {
84                let overlay = Overlay::new(self.node_key.clone(), *overlay_id, &[], options);
85                entry.insert(overlay.clone());
86                (overlay, true)
87            }
88            Entry::Occupied(entry) => (entry.get().clone(), false),
89        }
90    }
91
92    /// Creates new private overlay
93    pub fn add_private_overlay(
94        &self,
95        overlay_id: &IdShort,
96        overlay_key: Arc<adnl::Key>,
97        peers: &[adnl::NodeIdShort],
98        options: OverlayOptions,
99    ) -> (Arc<Overlay>, bool) {
100        use dashmap::mapref::entry::Entry;
101
102        match self.state.overlays.entry(*overlay_id) {
103            Entry::Vacant(entry) => {
104                let overlay = Overlay::new(overlay_key, *overlay_id, peers, options);
105                entry.insert(overlay.clone());
106                (overlay, true)
107            }
108            Entry::Occupied(entry) => (entry.get().clone(), false),
109        }
110    }
111
112    /// Returns overlay by specified id
113    #[inline(always)]
114    pub fn get_overlay(&self, overlay_id: &IdShort) -> Result<Arc<Overlay>> {
115        self.state.get_overlay(overlay_id)
116    }
117}
118
119#[derive(Default)]
120struct NodeState {
121    /// Overlays by ids
122    overlays: FastDashMap<IdShort, Arc<Overlay>>,
123    /// Overlay query subscribers
124    subscribers: FastDashMap<IdShort, Arc<dyn QuerySubscriber>>,
125}
126
127impl NodeState {
128    fn get_overlay(&self, overlay_id: &IdShort) -> Result<Arc<Overlay>> {
129        match self.overlays.get(overlay_id) {
130            Some(overlay) => Ok(overlay.clone()),
131            None => Err(NodeError::UnknownOverlay.into()),
132        }
133    }
134}
135
136#[async_trait::async_trait]
137impl MessageSubscriber for NodeState {
138    async fn try_consume_custom<'a>(
139        &self,
140        ctx: SubscriberContext<'a>,
141        constructor: u32,
142        data: &'a [u8],
143    ) -> Result<bool> {
144        if constructor != proto::overlay::Message::TL_ID {
145            return Ok(false);
146        }
147
148        let mut offset = 4; // skip `overlay::Message` constructor
149        let overlay_id = IdShort::from(<[u8; 32]>::read_from(data, &mut offset)?);
150        let broadcast = proto::overlay::Broadcast::read_from(data, &mut offset)?;
151
152        // TODO: check that offset == data.len()
153
154        let overlay = self.get_overlay(&overlay_id)?;
155        match broadcast {
156            proto::overlay::Broadcast::Broadcast(broadcast) => {
157                overlay
158                    .receive_broadcast(ctx.adnl, ctx.local_id, ctx.peer_id, broadcast, data)
159                    .await?;
160                Ok(true)
161            }
162            proto::overlay::Broadcast::BroadcastFec(broadcast) => {
163                overlay
164                    .receive_fec_broadcast(ctx.adnl, ctx.local_id, ctx.peer_id, broadcast, data)
165                    .await?;
166                Ok(true)
167            }
168            _ => Err(NodeError::UnsupportedOverlayBroadcastMessage.into()),
169        }
170    }
171}
172
173#[async_trait::async_trait]
174impl QuerySubscriber for NodeState {
175    async fn try_consume_query<'a>(
176        &self,
177        ctx: SubscriberContext<'a>,
178        constructor: u32,
179        query: Cow<'a, [u8]>,
180    ) -> Result<QueryConsumingResult<'a>> {
181        if constructor != proto::rpc::OverlayQuery::TL_ID {
182            return Ok(QueryConsumingResult::Rejected(query));
183        }
184
185        let mut offset = 4; // skip `rpc::OverlayQuery` constructor
186        let overlay_id = IdShort::from(<[u8; 32]>::read_from(&query, &mut offset)?);
187
188        let constructor = u32::read_from(&query, &mut std::convert::identity(offset))?;
189        if constructor == proto::rpc::OverlayGetRandomPeers::TL_ID {
190            let query = proto::rpc::OverlayGetRandomPeers::read_from(&query, &mut offset)?;
191            let overlay = self.get_overlay(&overlay_id)?;
192            return QueryConsumingResult::consume(
193                overlay.process_get_random_peers(query).into_boxed(),
194            );
195        }
196
197        let consumer = match self.subscribers.get(&overlay_id) {
198            Some(consumer) => consumer.clone(),
199            None => return Err(NodeError::NoConsumerFound.into()),
200        };
201
202        match consumer
203            .try_consume_query(ctx, constructor, Cow::Borrowed(&query[offset..]))
204            .await?
205        {
206            QueryConsumingResult::Consumed(result) => Ok(QueryConsumingResult::Consumed(result)),
207            QueryConsumingResult::Rejected(_) => Err(NodeError::UnsupportedQuery.into()),
208        }
209    }
210}
211
212#[derive(thiserror::Error, Debug)]
213enum NodeError {
214    #[error("Unsupported overlay broadcast message")]
215    UnsupportedOverlayBroadcastMessage,
216    #[error("Unknown overlay")]
217    UnknownOverlay,
218    #[error("No consumer for message in overlay")]
219    NoConsumerFound,
220    #[error("Unsupported query")]
221    UnsupportedQuery,
222}