everscale_network/overlay/
node.rs1use std::borrow::Cow;
2use std::sync::Arc;
3
4use anyhow::Result;
5use tl_proto::{BoxedConstructor, TlRead};
6
7use super::overlay::{Overlay, OverlayMetrics, OverlayOptions};
8use super::overlay_id::IdShort;
9use crate::adnl;
10use crate::proto;
11use crate::subscriber::*;
12use crate::util::*;
13
14pub struct Node {
16 adnl: Arc<adnl::Node>,
18 node_key: Arc<adnl::Key>,
20 state: Arc<NodeState>,
22}
23
24impl Node {
25 pub fn new(adnl: Arc<adnl::Node>, key_tag: usize) -> Result<Arc<Self>> {
26 let node_key = adnl.key_by_tag(key_tag)?.clone();
27 let state = Arc::new(NodeState::default());
28
29 adnl.add_query_subscriber(state.clone())?;
30 adnl.add_message_subscriber(state.clone())?;
31
32 Ok(Arc::new(Self {
33 adnl,
34 node_key,
35 state,
36 }))
37 }
38
39 pub fn query_subscriber(&self) -> Arc<dyn QuerySubscriber> {
41 self.state.clone()
42 }
43
44 pub fn metrics(&self) -> impl Iterator<Item = (IdShort, OverlayMetrics)> + '_ {
46 self.state
47 .overlays
48 .iter()
49 .map(|item| (*item.id(), item.metrics()))
50 }
51
52 pub fn adnl(&self) -> &Arc<adnl::Node> {
54 &self.adnl
55 }
56
57 pub fn add_overlay_subscriber(
59 &self,
60 overlay_id: IdShort,
61 subscriber: Arc<dyn QuerySubscriber>,
62 ) -> bool {
63 use dashmap::mapref::entry::Entry;
64
65 match self.state.subscribers.entry(overlay_id) {
66 Entry::Vacant(entry) => {
67 entry.insert(subscriber);
68 true
69 }
70 Entry::Occupied(_) => false,
71 }
72 }
73
74 pub fn add_public_overlay(
76 &self,
77 overlay_id: &IdShort,
78 options: OverlayOptions,
79 ) -> (Arc<Overlay>, bool) {
80 use dashmap::mapref::entry::Entry;
81
82 match self.state.overlays.entry(*overlay_id) {
83 Entry::Vacant(entry) => {
84 let overlay = Overlay::new(self.node_key.clone(), *overlay_id, &[], options);
85 entry.insert(overlay.clone());
86 (overlay, true)
87 }
88 Entry::Occupied(entry) => (entry.get().clone(), false),
89 }
90 }
91
92 pub fn add_private_overlay(
94 &self,
95 overlay_id: &IdShort,
96 overlay_key: Arc<adnl::Key>,
97 peers: &[adnl::NodeIdShort],
98 options: OverlayOptions,
99 ) -> (Arc<Overlay>, bool) {
100 use dashmap::mapref::entry::Entry;
101
102 match self.state.overlays.entry(*overlay_id) {
103 Entry::Vacant(entry) => {
104 let overlay = Overlay::new(overlay_key, *overlay_id, peers, options);
105 entry.insert(overlay.clone());
106 (overlay, true)
107 }
108 Entry::Occupied(entry) => (entry.get().clone(), false),
109 }
110 }
111
112 #[inline(always)]
114 pub fn get_overlay(&self, overlay_id: &IdShort) -> Result<Arc<Overlay>> {
115 self.state.get_overlay(overlay_id)
116 }
117}
118
119#[derive(Default)]
120struct NodeState {
121 overlays: FastDashMap<IdShort, Arc<Overlay>>,
123 subscribers: FastDashMap<IdShort, Arc<dyn QuerySubscriber>>,
125}
126
127impl NodeState {
128 fn get_overlay(&self, overlay_id: &IdShort) -> Result<Arc<Overlay>> {
129 match self.overlays.get(overlay_id) {
130 Some(overlay) => Ok(overlay.clone()),
131 None => Err(NodeError::UnknownOverlay.into()),
132 }
133 }
134}
135
136#[async_trait::async_trait]
137impl MessageSubscriber for NodeState {
138 async fn try_consume_custom<'a>(
139 &self,
140 ctx: SubscriberContext<'a>,
141 constructor: u32,
142 data: &'a [u8],
143 ) -> Result<bool> {
144 if constructor != proto::overlay::Message::TL_ID {
145 return Ok(false);
146 }
147
148 let mut offset = 4; let overlay_id = IdShort::from(<[u8; 32]>::read_from(data, &mut offset)?);
150 let broadcast = proto::overlay::Broadcast::read_from(data, &mut offset)?;
151
152 let overlay = self.get_overlay(&overlay_id)?;
155 match broadcast {
156 proto::overlay::Broadcast::Broadcast(broadcast) => {
157 overlay
158 .receive_broadcast(ctx.adnl, ctx.local_id, ctx.peer_id, broadcast, data)
159 .await?;
160 Ok(true)
161 }
162 proto::overlay::Broadcast::BroadcastFec(broadcast) => {
163 overlay
164 .receive_fec_broadcast(ctx.adnl, ctx.local_id, ctx.peer_id, broadcast, data)
165 .await?;
166 Ok(true)
167 }
168 _ => Err(NodeError::UnsupportedOverlayBroadcastMessage.into()),
169 }
170 }
171}
172
173#[async_trait::async_trait]
174impl QuerySubscriber for NodeState {
175 async fn try_consume_query<'a>(
176 &self,
177 ctx: SubscriberContext<'a>,
178 constructor: u32,
179 query: Cow<'a, [u8]>,
180 ) -> Result<QueryConsumingResult<'a>> {
181 if constructor != proto::rpc::OverlayQuery::TL_ID {
182 return Ok(QueryConsumingResult::Rejected(query));
183 }
184
185 let mut offset = 4; let overlay_id = IdShort::from(<[u8; 32]>::read_from(&query, &mut offset)?);
187
188 let constructor = u32::read_from(&query, &mut std::convert::identity(offset))?;
189 if constructor == proto::rpc::OverlayGetRandomPeers::TL_ID {
190 let query = proto::rpc::OverlayGetRandomPeers::read_from(&query, &mut offset)?;
191 let overlay = self.get_overlay(&overlay_id)?;
192 return QueryConsumingResult::consume(
193 overlay.process_get_random_peers(query).into_boxed(),
194 );
195 }
196
197 let consumer = match self.subscribers.get(&overlay_id) {
198 Some(consumer) => consumer.clone(),
199 None => return Err(NodeError::NoConsumerFound.into()),
200 };
201
202 match consumer
203 .try_consume_query(ctx, constructor, Cow::Borrowed(&query[offset..]))
204 .await?
205 {
206 QueryConsumingResult::Consumed(result) => Ok(QueryConsumingResult::Consumed(result)),
207 QueryConsumingResult::Rejected(_) => Err(NodeError::UnsupportedQuery.into()),
208 }
209 }
210}
211
212#[derive(thiserror::Error, Debug)]
213enum NodeError {
214 #[error("Unsupported overlay broadcast message")]
215 UnsupportedOverlayBroadcastMessage,
216 #[error("Unknown overlay")]
217 UnknownOverlay,
218 #[error("No consumer for message in overlay")]
219 NoConsumerFound,
220 #[error("Unsupported query")]
221 UnsupportedQuery,
222}