dusk_node/
databroker.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7pub mod conf;
8
9use std::cmp::min;
10use std::net::SocketAddr;
11use std::sync::Arc;
12
13use anyhow::{anyhow, Result};
14use async_trait::async_trait;
15use node_data::message::payload::{self, GetResource, InvParam, InvType};
16use node_data::message::{AsyncQueue, Payload, Topics};
17use smallvec::SmallVec;
18use tokio::sync::{RwLock, Semaphore};
19use tracing::{debug, info, warn};
20
21use crate::database::{ConsensusStorage, Ledger, Mempool};
22use crate::{database, vm, LongLivedService, Message, Network};
23
24const TOPICS: &[u8] = &[
25    Topics::GetBlocks as u8,
26    Topics::GetMempool as u8,
27    Topics::Inv as u8,
28    Topics::GetResource as u8,
29];
30
31struct Response {
32    /// A response usually consists of a single message. However, in case of
33    /// GetMempool and GetBlocks we may need to send multiple messages in
34    /// response to a single request.
35    msgs: SmallVec<[Message; 1]>,
36
37    /// Destination address of the response.
38    recv_peer: SocketAddr,
39}
40
41impl Response {
42    fn new(msgs: Vec<Message>, recv_peer: SocketAddr) -> Self {
43        Self {
44            msgs: SmallVec::from_vec(msgs),
45            recv_peer,
46        }
47    }
48
49    /// Creates a new response from a single message.
50    fn new_from_msg(msg: Message, recv_peer: SocketAddr) -> Self {
51        Self {
52            msgs: SmallVec::from_buf([msg]),
53            recv_peer,
54        }
55    }
56}
57/// Implements a request-for-data service.
58///
59/// The data broker acts as an intermediary between data producers (such as
60/// Ledger, Candidates and Mempool databases ) and data consumers which could be
61/// any node in the network that needs to recover any state.
62///
63/// Similar to a HTTP Server, the DataBroker service processes each request in
64/// a separate tokio::task.
65///
66/// It also limits the number of concurrent requests.
67pub struct DataBrokerSrv {
68    /// A queue of pending requests to process.
69    /// Request here is literally a GET message
70    inbound: AsyncQueue<Message>,
71
72    /// Limits the number of ongoing requests.
73    limit_ongoing_requests: Arc<Semaphore>,
74
75    conf: conf::Params,
76}
77
78impl DataBrokerSrv {
79    pub fn new(conf: conf::Params) -> Self {
80        info!("DataBrokerSrv::new with conf: {conf:?}");
81        Self {
82            conf,
83            inbound: AsyncQueue::bounded(
84                conf.max_queue_size,
85                "databroker_inbound",
86            ),
87            limit_ongoing_requests: Arc::new(Semaphore::new(
88                conf.max_ongoing_requests,
89            )),
90        }
91    }
92}
93
94#[async_trait]
95impl<N: Network, DB: database::DB, VM: vm::VMExecution>
96    LongLivedService<N, DB, VM> for DataBrokerSrv
97{
98    async fn execute(
99        &mut self,
100        network: Arc<RwLock<N>>,
101        db: Arc<RwLock<DB>>,
102        _vm: Arc<RwLock<VM>>,
103    ) -> anyhow::Result<usize> {
104        if self.conf.max_ongoing_requests == 0 {
105            return Err(anyhow!("max_ongoing_requests must be greater than 0"));
106        }
107
108        // Register routes
109        LongLivedService::<N, DB, VM>::add_routes(
110            self,
111            TOPICS,
112            self.inbound.clone(),
113            &network,
114        )
115        .await?;
116
117        info!("data_broker service started");
118
119        loop {
120            // Wait until we can process a new request. We limit the number of
121            // concurrent requests to mitigate a DoS attack.
122            let permit =
123                self.limit_ongoing_requests.clone().acquire_owned().await?;
124
125            // Wait for a request to process.
126            let msg = self.inbound.recv().await?;
127
128            let network = network.clone();
129            let db = db.clone();
130            let conf = self.conf;
131
132            // Spawn a task to handle the request asynchronously.
133            tokio::spawn(async move {
134                match Self::handle_request::<N, DB>(&db, &network, &msg, &conf)
135                    .await
136                {
137                    Ok(resp) => {
138                        // Send response
139                        let net = network.read().await;
140                        for msg in resp.msgs {
141                            let send = net.send_to_peer(msg, resp.recv_peer);
142                            if let Err(e) = send.await {
143                                warn!("Unable to send_to_peer {e}")
144                            };
145
146                            // Mitigate pressure on UDP buffers.
147                            // Needed only in localnet.
148                            if let Some(milli_sec) = conf.delay_on_resp_msg {
149                                tokio::time::sleep(
150                                    std::time::Duration::from_millis(milli_sec),
151                                )
152                                .await;
153                            }
154                        }
155                    }
156                    Err(e) => {
157                        warn!("error on handling msg: {}", e);
158                    }
159                };
160
161                // Release the permit.
162                drop(permit);
163            });
164        }
165    }
166
167    /// Returns service name.
168    fn name(&self) -> &'static str {
169        "data_broker"
170    }
171}
172
173impl DataBrokerSrv {
174    /// Handles inbound messages.
175    async fn handle_request<N: Network, DB: database::DB>(
176        db: &Arc<RwLock<DB>>,
177        network: &Arc<RwLock<N>>,
178        msg: &Message,
179        conf: &conf::Params,
180    ) -> anyhow::Result<Response> {
181        // source address of the request becomes the receiver address of the
182        // response
183        let recv_peer = msg
184            .metadata
185            .as_ref()
186            .map(|m| m.src_addr)
187            .ok_or_else(|| anyhow::anyhow!("invalid metadata src_addr"))?;
188
189        debug!(event = "request received", ?msg.payload, ?msg.metadata);
190        let this_peer = *network.read().await.public_addr();
191
192        match &msg.payload {
193            // Handle GetBlocks requests
194            Payload::GetBlocks(m) => {
195                let msg = Self::handle_get_blocks(db, m, conf.max_inv_entries)
196                    .await?;
197                Ok(Response::new_from_msg(msg, recv_peer))
198            }
199            // Handle GetMempool requests
200            Payload::GetMempool(_) => {
201                let msg = Self::handle_get_mempool(db).await?;
202                Ok(Response::new_from_msg(msg, recv_peer))
203            }
204            // Handle Inv messages
205            Payload::Inv(m) => {
206                let msg =
207                    Self::handle_inv(db, m, conf.max_inv_entries, this_peer)
208                        .await?;
209                Ok(Response::new_from_msg(msg, recv_peer))
210            }
211            // Handle GetResource requests
212            Payload::GetResource(m) => {
213                if m.is_expired() {
214                    return Err(anyhow!("message has expired"));
215                }
216
217                match Self::handle_get_resource(db, m, conf.max_inv_entries)
218                    .await
219                {
220                    Ok(msg_list) => {
221                        Ok(Response::new(msg_list, m.get_addr().unwrap()))
222                    }
223                    Err(err) => {
224                        // resource is not found, rebroadcast the request only
225                        // if hops_limit is not reached
226                        if let Some(m) = m.clone_with_hop_decrement() {
227                            // Construct a new message with same
228                            // Message::metadata but with decremented
229                            // hops_limit
230                            let mut msg = msg.clone();
231                            msg.payload = Payload::GetResource(m);
232
233                            debug!("resend a flood request {:?}", msg);
234
235                            let _ = network
236                                .read()
237                                .await
238                                .send_to_alive_peers(msg, 1)
239                                .await;
240                        }
241                        Err(err)
242                    }
243                }
244            }
245            _ => Err(anyhow::anyhow!("unhandled message payload")),
246        }
247    }
248
249    /// Handles GetMempool requests.
250    /// Message flow: GetMempool -> Inv -> GetResource -> Tx
251    async fn handle_get_mempool<DB: database::DB>(
252        db: &Arc<RwLock<DB>>,
253    ) -> Result<Message> {
254        let mut inv = payload::Inv::default();
255
256        db.read()
257            .await
258            .view(|t| {
259                for hash in t.mempool_txs_ids()? {
260                    inv.add_tx_id(hash);
261                }
262
263                if inv.inv_list.is_empty() {
264                    return Err(anyhow::anyhow!("mempool is empty"));
265                }
266
267                Ok(())
268            })
269            .map_err(|e| anyhow::anyhow!(e))?;
270
271        Ok(inv.into())
272    }
273
274    /// Handles GetBlocks message request.
275    ///
276    ///  Message flow: GetBlocks -> Inv -> GetResource -> Block
277    async fn handle_get_blocks<DB: database::DB>(
278        db: &Arc<RwLock<DB>>,
279        m: &payload::GetBlocks,
280        max_entries: usize,
281    ) -> Result<Message> {
282        let mut inv = payload::Inv::default();
283        db.read()
284            .await
285            .view(|t| {
286                let mut locator = t
287                    .block(&m.locator)?
288                    .ok_or_else(|| {
289                        anyhow::anyhow!("could not find locator block")
290                    })?
291                    .header()
292                    .height;
293
294                let mut prev_block_hash = m.locator;
295
296                loop {
297                    locator += 1;
298                    match t.block_hash_by_height(locator)? {
299                        Some(bh) => {
300                            let header =
301                                t.block_header(&bh)?.ok_or_else(|| {
302                                    anyhow!("block header not found")
303                                })?;
304
305                            if header.prev_block_hash != prev_block_hash {
306                                return Err(anyhow::anyhow!(
307                                    "inconsistent chain"
308                                ));
309                            }
310
311                            inv.add_block_from_hash(bh);
312                            prev_block_hash = bh;
313                        }
314                        None => {
315                            break;
316                        }
317                    }
318
319                    //limit to the number of blocks to fetch
320                    if inv.inv_list.len() >= max_entries {
321                        break;
322                    }
323                }
324
325                if inv.inv_list.is_empty() {
326                    return Err(anyhow::anyhow!("no blocks found"));
327                }
328
329                Ok(())
330            })
331            .map_err(|e| anyhow::anyhow!(e))?;
332
333        Ok(inv.into())
334    }
335
336    /// Handles inventory message request.
337    ///
338    /// This takes an inventory message (topics.Inv), checks it for any
339    /// items that the node state is missing, puts these items in a GetResource
340    /// wire message, and sends it back to request the items in full.
341    ///
342    /// An item is a block, a transaction, or a ValidationResult.
343    async fn handle_inv<DB: database::DB>(
344        db: &Arc<RwLock<DB>>,
345        m: &node_data::message::payload::Inv,
346        max_entries: usize,
347        requester_addr: SocketAddr,
348    ) -> Result<Message> {
349        let mut max_entries = max_entries;
350        if m.max_entries > 0 {
351            max_entries = min(max_entries, m.max_entries as usize);
352        }
353
354        let inv = db.read().await.view(|db| {
355            let mut inv = payload::Inv::default();
356            for i in &m.inv_list {
357                debug!(event = "handle_inv", ?i);
358                match i.inv_type {
359                    InvType::BlockFromHeight => {
360                        if let InvParam::Height(height) = &i.param {
361                            if db.block_by_height(*height)?.is_none() {
362                                inv.add_block_from_height(*height);
363                            }
364                        }
365                    }
366                    InvType::BlockFromHash => {
367                        if let InvParam::Hash(hash) = &i.param {
368                            if db.block(hash)?.is_none() {
369                                inv.add_block_from_hash(*hash);
370                            }
371                        }
372                    }
373                    InvType::CandidateFromHash => {
374                        if let InvParam::Hash(hash) = &i.param {
375                            if db.candidate(hash)?.is_none() {
376                                inv.add_candidate_from_hash(*hash);
377                            }
378                        }
379                    }
380                    InvType::MempoolTx => {
381                        if let InvParam::Hash(tx_id) = &i.param {
382                            if db.mempool_tx(*tx_id)?.is_none() {
383                                inv.add_tx_id(*tx_id);
384                            }
385                        }
386                    }
387                    InvType::CandidateFromIteration => {
388                        if let InvParam::Iteration(ch) = &i.param {
389                            if db.candidate_by_iteration(ch)?.is_none() {
390                                inv.add_candidate_from_iteration(*ch);
391                            }
392                        }
393                    }
394                    InvType::ValidationResult => {
395                        if let InvParam::Iteration(ch) = &i.param {
396                            if db.validation_result(ch)?.is_none() {
397                                inv.add_validation_result(*ch);
398                            }
399                        }
400                    }
401                }
402
403                if inv.inv_list.len() >= max_entries {
404                    break;
405                }
406            }
407
408            Ok::<payload::Inv, anyhow::Error>(inv)
409        })?;
410
411        if inv.inv_list.is_empty() {
412            return Err(anyhow::anyhow!("no items to fetch"));
413        }
414
415        // Send GetResource request with disabled rebroadcast (hops_limit = 1),
416        // Inv message is part of one-to-one messaging flows
417        // (GetBlocks/Mempool) so it should not be treated as flooding request
418        Ok(GetResource::new(inv, Some(requester_addr), u64::MAX, 1).into())
419    }
420
421    /// Handles GetResource message request.
422    ///
423    /// The response to a GetResource message is a vector of messages, each of
424    /// which could be either topics.Block or topics.Tx.
425    async fn handle_get_resource<DB: database::DB>(
426        db: &Arc<RwLock<DB>>,
427        m: &node_data::message::payload::GetResource,
428        max_entries: usize,
429    ) -> Result<Vec<Message>> {
430        let mut max_entries = max_entries;
431        if m.get_inv().max_entries > 0 {
432            max_entries = min(max_entries, m.get_inv().max_entries as usize);
433        }
434
435        db.read().await.view(|db| {
436            let res: Vec<Message> = m
437                .get_inv()
438                .inv_list
439                .iter()
440                .filter_map(|i| match i.inv_type {
441                    InvType::BlockFromHeight => {
442                        if let InvParam::Height(height) = &i.param {
443                            db.block_by_height(*height)
444                                .ok()
445                                .flatten()
446                                .map(Message::from)
447                        } else {
448                            None
449                        }
450                    }
451                    InvType::BlockFromHash => {
452                        if let InvParam::Hash(hash) = &i.param {
453                            db.block(hash).ok().flatten().map(Message::from)
454                        } else {
455                            None
456                        }
457                    }
458                    InvType::CandidateFromHash => {
459                        if let InvParam::Hash(hash) = &i.param {
460                            db.block(hash)
461                                .ok()
462                                .flatten()
463                                .or_else(|| db.candidate(hash).ok().flatten())
464                                .map(Message::from)
465                        } else {
466                            None
467                        }
468                    }
469                    InvType::MempoolTx => {
470                        if let InvParam::Hash(tx_id) = &i.param {
471                            db.mempool_tx(*tx_id)
472                                .ok()
473                                .flatten()
474                                .map(Message::from)
475                        } else {
476                            None
477                        }
478                    }
479                    InvType::CandidateFromIteration => {
480                        if let InvParam::Iteration(ch) = &i.param {
481                            db.candidate_by_iteration(ch).ok().flatten().map(
482                                |candidate| {
483                                    Message::from(payload::Candidate {
484                                        candidate,
485                                    })
486                                },
487                            )
488                        } else {
489                            None
490                        }
491                    }
492                    InvType::ValidationResult => {
493                        if let InvParam::Iteration(ch) = &i.param {
494                            db.validation_result(ch).ok().flatten().map(|vr| {
495                                Message::from(payload::ValidationQuorum {
496                                    header: *ch,
497                                    result: vr,
498                                })
499                            })
500                        } else {
501                            None
502                        }
503                    }
504                })
505                .take(max_entries)
506                .collect();
507
508            if res.is_empty() {
509                // If nothing was found, return an error so that the caller is
510                // instructed to rebroadcast the request, if needed
511                debug!("handle_get_resource not found {:?}", m);
512                return Err(anyhow!("not found"));
513            }
514
515            Ok(res)
516        })
517    }
518}