1pub mod conf;
8
9use std::cmp::min;
10use std::net::SocketAddr;
11use std::sync::Arc;
12
13use anyhow::{anyhow, Result};
14use async_trait::async_trait;
15use node_data::message::payload::{self, GetResource, InvParam, InvType};
16use node_data::message::{AsyncQueue, Payload, Topics};
17use smallvec::SmallVec;
18use tokio::sync::{RwLock, Semaphore};
19use tracing::{debug, info, warn};
20
21use crate::database::{ConsensusStorage, Ledger, Mempool};
22use crate::{database, vm, LongLivedService, Message, Network};
23
24const TOPICS: &[u8] = &[
25 Topics::GetBlocks as u8,
26 Topics::GetMempool as u8,
27 Topics::Inv as u8,
28 Topics::GetResource as u8,
29];
30
31struct Response {
32 msgs: SmallVec<[Message; 1]>,
36
37 recv_peer: SocketAddr,
39}
40
41impl Response {
42 fn new(msgs: Vec<Message>, recv_peer: SocketAddr) -> Self {
43 Self {
44 msgs: SmallVec::from_vec(msgs),
45 recv_peer,
46 }
47 }
48
49 fn new_from_msg(msg: Message, recv_peer: SocketAddr) -> Self {
51 Self {
52 msgs: SmallVec::from_buf([msg]),
53 recv_peer,
54 }
55 }
56}
57pub struct DataBrokerSrv {
68 inbound: AsyncQueue<Message>,
71
72 limit_ongoing_requests: Arc<Semaphore>,
74
75 conf: conf::Params,
76}
77
78impl DataBrokerSrv {
79 pub fn new(conf: conf::Params) -> Self {
80 info!("DataBrokerSrv::new with conf: {conf:?}");
81 Self {
82 conf,
83 inbound: AsyncQueue::bounded(
84 conf.max_queue_size,
85 "databroker_inbound",
86 ),
87 limit_ongoing_requests: Arc::new(Semaphore::new(
88 conf.max_ongoing_requests,
89 )),
90 }
91 }
92}
93
94#[async_trait]
95impl<N: Network, DB: database::DB, VM: vm::VMExecution>
96 LongLivedService<N, DB, VM> for DataBrokerSrv
97{
98 async fn execute(
99 &mut self,
100 network: Arc<RwLock<N>>,
101 db: Arc<RwLock<DB>>,
102 _vm: Arc<RwLock<VM>>,
103 ) -> anyhow::Result<usize> {
104 if self.conf.max_ongoing_requests == 0 {
105 return Err(anyhow!("max_ongoing_requests must be greater than 0"));
106 }
107
108 LongLivedService::<N, DB, VM>::add_routes(
110 self,
111 TOPICS,
112 self.inbound.clone(),
113 &network,
114 )
115 .await?;
116
117 info!("data_broker service started");
118
119 loop {
120 let permit =
123 self.limit_ongoing_requests.clone().acquire_owned().await?;
124
125 let msg = self.inbound.recv().await?;
127
128 let network = network.clone();
129 let db = db.clone();
130 let conf = self.conf;
131
132 tokio::spawn(async move {
134 match Self::handle_request::<N, DB>(&db, &network, &msg, &conf)
135 .await
136 {
137 Ok(resp) => {
138 let net = network.read().await;
140 for msg in resp.msgs {
141 let send = net.send_to_peer(msg, resp.recv_peer);
142 if let Err(e) = send.await {
143 warn!("Unable to send_to_peer {e}")
144 };
145
146 if let Some(milli_sec) = conf.delay_on_resp_msg {
149 tokio::time::sleep(
150 std::time::Duration::from_millis(milli_sec),
151 )
152 .await;
153 }
154 }
155 }
156 Err(e) => {
157 warn!("error on handling msg: {}", e);
158 }
159 };
160
161 drop(permit);
163 });
164 }
165 }
166
167 fn name(&self) -> &'static str {
169 "data_broker"
170 }
171}
172
173impl DataBrokerSrv {
174 async fn handle_request<N: Network, DB: database::DB>(
176 db: &Arc<RwLock<DB>>,
177 network: &Arc<RwLock<N>>,
178 msg: &Message,
179 conf: &conf::Params,
180 ) -> anyhow::Result<Response> {
181 let recv_peer = msg
184 .metadata
185 .as_ref()
186 .map(|m| m.src_addr)
187 .ok_or_else(|| anyhow::anyhow!("invalid metadata src_addr"))?;
188
189 debug!(event = "request received", ?msg.payload, ?msg.metadata);
190 let this_peer = *network.read().await.public_addr();
191
192 match &msg.payload {
193 Payload::GetBlocks(m) => {
195 let msg = Self::handle_get_blocks(db, m, conf.max_inv_entries)
196 .await?;
197 Ok(Response::new_from_msg(msg, recv_peer))
198 }
199 Payload::GetMempool(_) => {
201 let msg = Self::handle_get_mempool(db).await?;
202 Ok(Response::new_from_msg(msg, recv_peer))
203 }
204 Payload::Inv(m) => {
206 let msg =
207 Self::handle_inv(db, m, conf.max_inv_entries, this_peer)
208 .await?;
209 Ok(Response::new_from_msg(msg, recv_peer))
210 }
211 Payload::GetResource(m) => {
213 if m.is_expired() {
214 return Err(anyhow!("message has expired"));
215 }
216
217 match Self::handle_get_resource(db, m, conf.max_inv_entries)
218 .await
219 {
220 Ok(msg_list) => {
221 Ok(Response::new(msg_list, m.get_addr().unwrap()))
222 }
223 Err(err) => {
224 if let Some(m) = m.clone_with_hop_decrement() {
227 let mut msg = msg.clone();
231 msg.payload = Payload::GetResource(m);
232
233 debug!("resend a flood request {:?}", msg);
234
235 let _ = network
236 .read()
237 .await
238 .send_to_alive_peers(msg, 1)
239 .await;
240 }
241 Err(err)
242 }
243 }
244 }
245 _ => Err(anyhow::anyhow!("unhandled message payload")),
246 }
247 }
248
249 async fn handle_get_mempool<DB: database::DB>(
252 db: &Arc<RwLock<DB>>,
253 ) -> Result<Message> {
254 let mut inv = payload::Inv::default();
255
256 db.read()
257 .await
258 .view(|t| {
259 for hash in t.mempool_txs_ids()? {
260 inv.add_tx_id(hash);
261 }
262
263 if inv.inv_list.is_empty() {
264 return Err(anyhow::anyhow!("mempool is empty"));
265 }
266
267 Ok(())
268 })
269 .map_err(|e| anyhow::anyhow!(e))?;
270
271 Ok(inv.into())
272 }
273
274 async fn handle_get_blocks<DB: database::DB>(
278 db: &Arc<RwLock<DB>>,
279 m: &payload::GetBlocks,
280 max_entries: usize,
281 ) -> Result<Message> {
282 let mut inv = payload::Inv::default();
283 db.read()
284 .await
285 .view(|t| {
286 let mut locator = t
287 .block(&m.locator)?
288 .ok_or_else(|| {
289 anyhow::anyhow!("could not find locator block")
290 })?
291 .header()
292 .height;
293
294 let mut prev_block_hash = m.locator;
295
296 loop {
297 locator += 1;
298 match t.block_hash_by_height(locator)? {
299 Some(bh) => {
300 let header =
301 t.block_header(&bh)?.ok_or_else(|| {
302 anyhow!("block header not found")
303 })?;
304
305 if header.prev_block_hash != prev_block_hash {
306 return Err(anyhow::anyhow!(
307 "inconsistent chain"
308 ));
309 }
310
311 inv.add_block_from_hash(bh);
312 prev_block_hash = bh;
313 }
314 None => {
315 break;
316 }
317 }
318
319 if inv.inv_list.len() >= max_entries {
321 break;
322 }
323 }
324
325 if inv.inv_list.is_empty() {
326 return Err(anyhow::anyhow!("no blocks found"));
327 }
328
329 Ok(())
330 })
331 .map_err(|e| anyhow::anyhow!(e))?;
332
333 Ok(inv.into())
334 }
335
336 async fn handle_inv<DB: database::DB>(
344 db: &Arc<RwLock<DB>>,
345 m: &node_data::message::payload::Inv,
346 max_entries: usize,
347 requester_addr: SocketAddr,
348 ) -> Result<Message> {
349 let mut max_entries = max_entries;
350 if m.max_entries > 0 {
351 max_entries = min(max_entries, m.max_entries as usize);
352 }
353
354 let inv = db.read().await.view(|db| {
355 let mut inv = payload::Inv::default();
356 for i in &m.inv_list {
357 debug!(event = "handle_inv", ?i);
358 match i.inv_type {
359 InvType::BlockFromHeight => {
360 if let InvParam::Height(height) = &i.param {
361 if db.block_by_height(*height)?.is_none() {
362 inv.add_block_from_height(*height);
363 }
364 }
365 }
366 InvType::BlockFromHash => {
367 if let InvParam::Hash(hash) = &i.param {
368 if db.block(hash)?.is_none() {
369 inv.add_block_from_hash(*hash);
370 }
371 }
372 }
373 InvType::CandidateFromHash => {
374 if let InvParam::Hash(hash) = &i.param {
375 if db.candidate(hash)?.is_none() {
376 inv.add_candidate_from_hash(*hash);
377 }
378 }
379 }
380 InvType::MempoolTx => {
381 if let InvParam::Hash(tx_id) = &i.param {
382 if db.mempool_tx(*tx_id)?.is_none() {
383 inv.add_tx_id(*tx_id);
384 }
385 }
386 }
387 InvType::CandidateFromIteration => {
388 if let InvParam::Iteration(ch) = &i.param {
389 if db.candidate_by_iteration(ch)?.is_none() {
390 inv.add_candidate_from_iteration(*ch);
391 }
392 }
393 }
394 InvType::ValidationResult => {
395 if let InvParam::Iteration(ch) = &i.param {
396 if db.validation_result(ch)?.is_none() {
397 inv.add_validation_result(*ch);
398 }
399 }
400 }
401 }
402
403 if inv.inv_list.len() >= max_entries {
404 break;
405 }
406 }
407
408 Ok::<payload::Inv, anyhow::Error>(inv)
409 })?;
410
411 if inv.inv_list.is_empty() {
412 return Err(anyhow::anyhow!("no items to fetch"));
413 }
414
415 Ok(GetResource::new(inv, Some(requester_addr), u64::MAX, 1).into())
419 }
420
421 async fn handle_get_resource<DB: database::DB>(
426 db: &Arc<RwLock<DB>>,
427 m: &node_data::message::payload::GetResource,
428 max_entries: usize,
429 ) -> Result<Vec<Message>> {
430 let mut max_entries = max_entries;
431 if m.get_inv().max_entries > 0 {
432 max_entries = min(max_entries, m.get_inv().max_entries as usize);
433 }
434
435 db.read().await.view(|db| {
436 let res: Vec<Message> = m
437 .get_inv()
438 .inv_list
439 .iter()
440 .filter_map(|i| match i.inv_type {
441 InvType::BlockFromHeight => {
442 if let InvParam::Height(height) = &i.param {
443 db.block_by_height(*height)
444 .ok()
445 .flatten()
446 .map(Message::from)
447 } else {
448 None
449 }
450 }
451 InvType::BlockFromHash => {
452 if let InvParam::Hash(hash) = &i.param {
453 db.block(hash).ok().flatten().map(Message::from)
454 } else {
455 None
456 }
457 }
458 InvType::CandidateFromHash => {
459 if let InvParam::Hash(hash) = &i.param {
460 db.block(hash)
461 .ok()
462 .flatten()
463 .or_else(|| db.candidate(hash).ok().flatten())
464 .map(Message::from)
465 } else {
466 None
467 }
468 }
469 InvType::MempoolTx => {
470 if let InvParam::Hash(tx_id) = &i.param {
471 db.mempool_tx(*tx_id)
472 .ok()
473 .flatten()
474 .map(Message::from)
475 } else {
476 None
477 }
478 }
479 InvType::CandidateFromIteration => {
480 if let InvParam::Iteration(ch) = &i.param {
481 db.candidate_by_iteration(ch).ok().flatten().map(
482 |candidate| {
483 Message::from(payload::Candidate {
484 candidate,
485 })
486 },
487 )
488 } else {
489 None
490 }
491 }
492 InvType::ValidationResult => {
493 if let InvParam::Iteration(ch) = &i.param {
494 db.validation_result(ch).ok().flatten().map(|vr| {
495 Message::from(payload::ValidationQuorum {
496 header: *ch,
497 result: vr,
498 })
499 })
500 } else {
501 None
502 }
503 }
504 })
505 .take(max_entries)
506 .collect();
507
508 if res.is_empty() {
509 debug!("handle_get_resource not found {:?}", m);
512 return Err(anyhow!("not found"));
513 }
514
515 Ok(res)
516 })
517 }
518}