1mod agreement;
10mod anti_entropy;
11mod dkg;
12mod join;
13mod proposals;
14mod relocation;
15mod resource_proof;
16mod service_msgs;
17mod update_section;
18
19pub(crate) use proposals::handle_proposal;
20
21use crate::dbs::Error as DbError;
22use crate::node::{
23 api::cmds::Cmd,
24 core::{DkgSessionInfo, Node, Proposal as CoreProposal, DATA_QUERY_LIMIT},
25 messages::{NodeMsgAuthorityUtils, WireMsgUtils},
26 Error, Event, MessageReceived, Result, MIN_LEVEL_WHEN_FULL,
27};
28use sn_interface::messaging::{
29 data::{ServiceMsg, StorageLevel},
30 signature_aggregator::Error as AggregatorError,
31 system::{
32 JoinRequest, JoinResponse, NodeCmd, NodeEvent, NodeQuery, Proposal as ProposalMsg,
33 SectionAuth as SystemSectionAuth, SystemMsg,
34 },
35 AuthorityProof, DstLocation, MsgId, MsgType, NodeMsgAuthority, SectionAuth, WireMsg,
36};
37use sn_interface::network_knowledge::NetworkKnowledge;
38use sn_interface::types::{log_markers::LogMarker, Peer, PublicKey};
39
40use bls::PublicKey as BlsPublicKey;
41use bytes::Bytes;
42use itertools::Itertools;
43use std::collections::BTreeSet;
44use tokio::time::Duration;
45use xor_name::XorName;
46
47const REPLICATION_BATCH_SIZE: usize = 50;
48const REPLICATION_MSG_THROTTLE_DURATION: Duration = Duration::from_secs(5);
49
50impl Node {
52 #[instrument(skip(self, original_bytes))]
53 pub(crate) async fn handle_msg(
54 &self,
55 sender: Peer,
56 wire_msg: WireMsg,
57 original_bytes: Option<Bytes>,
58 ) -> Result<Vec<Cmd>> {
59 let mut cmds = vec![];
60
61 if let Some(load_report) = self.comm.tolerated_msgs_per_s(&sender).await {
63 let msg_src = wire_msg.msg_kind().src();
64 if !msg_src.is_end_user() {
65 trace!("Sending BackPressure: {}", load_report);
66 let src_section_pk = self.network_knowledge().section_key().await;
67
68 cmds.push(Cmd::SendMsg {
69 wire_msg: WireMsg::single_src(
70 &*self.info.read().await,
71 msg_src.to_dst(),
72 SystemMsg::BackPressure(load_report),
73 src_section_pk,
74 )?,
75 recipients: vec![sender],
76 })
77 }
78 }
79
80 let msg_id = wire_msg.msg_id();
82 let payload = wire_msg.payload.clone();
84
85 let message_type = match wire_msg.into_msg() {
86 Ok(message_type) => message_type,
87 Err(error) => {
88 error!(
89 "Failed to deserialize message payload ({:?}): {:?}",
90 msg_id, error
91 );
92 return Ok(cmds);
93 }
94 };
95
96 match message_type {
97 MsgType::System {
98 msg_id,
99 msg_authority,
100 dst_location,
101 msg,
102 } => {
103 let mut known_keys: Vec<BlsPublicKey> = self
106 .network_knowledge
107 .section_chain()
108 .await
109 .keys()
110 .copied()
111 .collect();
112 known_keys.extend(self.network_knowledge.prefix_map().section_keys());
113 known_keys.push(*self.network_knowledge.genesis_key());
114
115 if !Self::verify_msg_can_be_trusted(msg_authority.clone(), msg.clone(), &known_keys)
116 {
117 warn!(
118 "Untrusted message ({:?}) dropped from {:?}: {:?} ",
119 msg_id, sender, msg
120 );
121 return Ok(cmds);
122 }
123
124 if self.is_elder().await {
128 match msg {
135 SystemMsg::AntiEntropyRetry { .. }
136 | SystemMsg::AntiEntropyUpdate { .. }
137 | SystemMsg::AntiEntropyRedirect { .. }
138 | SystemMsg::JoinRequest(_)
139 | SystemMsg::JoinAsRelocatedRequest(_) => {
140 trace!(
141 "Entropy check skipped for {:?}, handling message directly",
142 msg_id
143 );
144 }
145 _ => match dst_location.section_pk() {
146 None => {}
147 Some(dst_section_pk) => {
148 let msg_bytes = original_bytes.unwrap_or(wire_msg.serialize()?);
149
150 if let Some(ae_cmd) = self
151 .check_for_entropy(
152 msg_bytes,
154 &msg_authority.src_location(),
155 &dst_section_pk,
156 dst_location.name(),
157 &sender,
158 )
159 .await?
160 {
161 let knowledge = self.network_knowledge.elders().await;
163 let mut known_elders = knowledge.iter().map(|peer| peer.name());
164
165 if known_elders.contains(&sender.name()) {
166 self.dysfunction_tracking
168 .track_knowledge_issue(sender.name())
169 .await
170 .map_err(Error::from)?;
171 }
172
173 cmds.push(ae_cmd);
175 return Ok(cmds);
176 }
177
178 trace!("Entropy check passed. Handling verified msg {:?}", msg_id);
179 }
180 },
181 }
182 }
183
184 let handling_msg_cmds = self
185 .handle_system_msg(
186 sender,
187 msg_id,
188 msg_authority,
189 dst_location,
190 msg,
191 payload,
192 known_keys,
193 )
194 .await?;
195
196 cmds.extend(handling_msg_cmds);
197
198 Ok(cmds)
199 }
200 MsgType::Service {
201 msg_id,
202 msg,
203 dst_location,
204 auth,
205 } => {
206 let dst_name = match msg.dst_address() {
207 Some(name) => name,
208 None => {
209 error!(
210 "Service msg has been dropped since {:?} is not a valid msg to send from a client {}.",
211 msg, sender.addr()
212 );
213 return Ok(vec![]);
214 }
215 };
216
217 let src_location = wire_msg.msg_kind().src();
218
219 if self.is_not_elder().await {
220 trace!("Redirecting from adult to section elders");
221 cmds.push(
222 self.ae_redirect_to_our_elders(sender, &src_location, &wire_msg)
223 .await?,
224 );
225 return Ok(cmds);
226 }
227
228 if let ServiceMsg::Query(_) = msg {
230 let pending_query_length = self.pending_data_queries.len().await;
232
233 if pending_query_length > DATA_QUERY_LIMIT {
234 warn!("Pending queries length exceeded, dropping query {msg:?}");
236 return Ok(vec![]);
237 }
238 }
239
240 let received_section_pk = match dst_location.section_pk() {
242 Some(section_pk) => section_pk,
243 None => {
244 warn!("Dropping service message as there is no valid dst section_pk.");
245 return Ok(cmds);
246 }
247 };
248
249 let msg_bytes = original_bytes.unwrap_or(wire_msg.serialize()?);
250 if let Some(cmd) = self
251 .check_for_entropy(
252 msg_bytes,
254 &src_location,
255 &received_section_pk,
256 dst_name,
257 &sender,
258 )
259 .await?
260 {
261 cmds.push(cmd);
263 return Ok(cmds);
264 }
265
266 cmds.extend(
267 self.handle_service_msg(msg_id, msg, dst_location, auth, sender)
268 .await?,
269 );
270
271 Ok(cmds)
272 }
273 }
274 }
275
276 #[allow(clippy::too_many_arguments)]
278 pub(crate) async fn handle_system_msg(
279 &self,
280 sender: Peer,
281 msg_id: MsgId,
282 mut msg_authority: NodeMsgAuthority,
283 dst_location: DstLocation,
284 msg: SystemMsg,
285 payload: Bytes,
286 known_keys: Vec<BlsPublicKey>,
287 ) -> Result<Vec<Cmd>> {
288 trace!("{:?}", LogMarker::SystemMsgToBeHandled);
289
290 match self
292 .aggregate_msg_and_stop(&mut msg_authority, payload)
293 .await
294 {
295 Ok(false) => {
296 self.handle_valid_msg(msg_id, msg_authority, dst_location, msg, sender, known_keys)
297 .await
298 }
299 Err(Error::InvalidSignatureShare) => {
300 warn!(
301 "Invalid signature on received system message, dropping the message: {:?}",
302 msg_id
303 );
304 Ok(vec![])
305 }
306 Ok(true) => Ok(vec![]),
307 Err(err) => {
308 trace!("handle_system_msg got error {:?}", err);
309 Ok(vec![])
310 }
311 }
312 }
313
314 pub(crate) async fn handle_valid_msg(
317 &self,
318 msg_id: MsgId,
319 msg_authority: NodeMsgAuthority,
320 dst_location: DstLocation,
321 node_msg: SystemMsg,
322 sender: Peer,
323 known_keys: Vec<BlsPublicKey>,
324 ) -> Result<Vec<Cmd>> {
325 let src_name = msg_authority.name();
326 match node_msg {
327 SystemMsg::AntiEntropyUpdate {
328 section_auth,
329 section_signed,
330 proof_chain,
331 members,
332 } => {
333 trace!("Handling msg: AE-Update from {}: {:?}", sender, msg_id,);
334 self.handle_anti_entropy_update_msg(
335 section_auth.into_state(),
336 section_signed,
337 proof_chain,
338 members,
339 )
340 .await
341 }
342 SystemMsg::Relocate(node_state) => {
343 trace!("Handling msg: Relocate from {}: {:?}", sender, msg_id);
344 Ok(self
345 .handle_relocate(node_state)
346 .await?
347 .into_iter()
348 .collect())
349 }
350 SystemMsg::StartConnectivityTest(name) => {
351 trace!(
352 "Handling msg: StartConnectivityTest from {}: {:?}",
353 sender,
354 msg_id
355 );
356 if self.is_not_elder().await {
357 return Ok(vec![]);
358 }
359
360 Ok(vec![Cmd::TestConnectivity(name)])
361 }
362 SystemMsg::JoinAsRelocatedResponse(join_response) => {
363 trace!("Handling msg: JoinAsRelocatedResponse from {}", sender);
364 if let Some(ref mut joining_as_relocated) = *self.relocate_state.write().await {
365 if let Some(cmd) = joining_as_relocated
366 .handle_join_response(*join_response, sender.addr())
367 .await?
368 {
369 return Ok(vec![cmd]);
370 }
371 } else {
372 error!(
373 "No relocation in progress upon receiving {:?}",
374 join_response
375 );
376 }
377
378 Ok(vec![])
379 }
380 SystemMsg::NodeMsgError {
381 error,
382 correlation_id,
383 } => {
384 trace!(
385 "From {:?}({:?}), received error {:?} correlated to {:?}",
386 msg_authority.src_location(),
387 msg_id,
388 error,
389 correlation_id
390 );
391 Ok(vec![])
392 }
393 SystemMsg::AntiEntropyRetry {
394 section_auth,
395 section_signed,
396 proof_chain,
397 bounced_msg,
398 } => {
399 trace!("Handling msg: AE-Retry from {}: {:?}", sender, msg_id,);
400 self.handle_anti_entropy_retry_msg(
401 section_auth.into_state(),
402 section_signed,
403 proof_chain,
404 bounced_msg,
405 sender,
406 )
407 .await
408 }
409 SystemMsg::AntiEntropyRedirect {
410 section_auth,
411 section_signed,
412 section_chain,
413 bounced_msg,
414 } => {
415 trace!("Handling msg: AE-Redirect from {}: {:?}", sender, msg_id);
416 self.handle_anti_entropy_redirect_msg(
417 section_auth.into_state(),
418 section_signed,
419 section_chain,
420 bounced_msg,
421 sender,
422 )
423 .await
424 }
425 SystemMsg::AntiEntropyProbe(_dst) => {
426 trace!("Received Probe message from {}: {:?}", sender, msg_id);
427 Ok(vec![])
428 }
429 SystemMsg::BackPressure(msgs_per_s) => {
430 trace!(
431 "Handling msg: BackPressure with requested {} msgs/s, from {}: {:?}",
432 msgs_per_s,
433 sender,
434 msg_id
435 );
436 self.comm.regulate(&sender, msgs_per_s).await;
438 Ok(vec![])
439 }
440 SystemMsg::JoinResponse(join_response) => {
442 match *join_response {
443 JoinResponse::ApprovalShare {
444 node_state,
445 sig_share,
446 section_chain,
447 members,
448 ..
449 } => {
450 let serialized_details = bincode::serialize(&node_state)?;
451
452 info!(
453 "Relocation: Aggregating received ApprovalShare from {:?}",
454 sender
455 );
456 match self
457 .proposal_aggregator
458 .add(&serialized_details, sig_share.clone())
459 .await
460 {
461 Ok(sig) => {
462 info!("Relocation: Successfully aggregated ApprovalShares for joining the network");
463 let mut cmds = vec![];
464
465 if let Some(ref mut joining_as_relocated) =
466 *self.relocate_state.write().await
467 {
468 let new_node = joining_as_relocated.node.clone();
469 let new_name = new_node.name();
470 let previous_name = self.info.read().await.name();
471 let new_keypair = new_node.keypair.clone();
472
473 info!(
474 "Relocation: switching from {:?} to {:?}",
475 previous_name, new_name
476 );
477
478 let genesis_key = *self.network_knowledge.genesis_key();
479 let prefix_map = self.network_knowledge.prefix_map().clone();
480
481 let (recipients, signed_sap) = if let Ok(sap) =
482 self.network_knowledge.section_by_name(&new_name)
483 {
484 if let Some(signed_sap) =
485 prefix_map.get_signed(&sap.prefix())
486 {
487 (sap.elders().cloned().collect(), signed_sap)
488 } else {
489 warn!(
490 "Relocation: cannot find signed_sap for {:?}",
491 sap.prefix()
492 );
493 return Ok(vec![]);
494 }
495 } else {
496 warn!("Relocation: cannot find recipients to send aggregated JoinApproval");
497 return Ok(vec![]);
498 };
499
500 let new_network_knowledge = NetworkKnowledge::new(
501 genesis_key,
502 section_chain,
503 signed_sap,
504 Some(prefix_map),
505 )?;
506 let _ = new_network_knowledge.merge_members(
507 members
508 .into_iter()
509 .map(|member| member.into_authed_state())
510 .collect(),
511 );
512
513 self.relocate(new_node, new_network_knowledge).await?;
519
520 let section_key = sig_share.public_key_set.public_key();
521 let auth = SystemSectionAuth {
522 value: node_state,
523 sig,
524 };
525 let join_req = JoinRequest {
526 section_key,
527 resource_proof_response: None,
528 aggregated: Some(auth),
529 };
530
531 trace!(
532 "Relocation: Sending aggregated JoinRequest to {:?}",
533 recipients
534 );
535 let node_msg = SystemMsg::JoinRequest(Box::new(join_req));
538 let wire_msg = WireMsg::single_src(
539 &self.info.read().await.clone(),
540 DstLocation::Section {
541 name: new_name,
542 section_pk: section_key,
543 },
544 node_msg,
545 section_key,
546 )?;
547 cmds.push(Cmd::SendMsg {
548 recipients,
549 wire_msg,
550 });
551
552 self.send_event(Event::Relocated {
553 previous_name,
554 new_keypair,
555 })
556 .await;
557
558 trace!("{}", LogMarker::RelocateEnd);
559 } else {
560 warn!("Relocation: self.relocate_state is not in Progress");
561 return Ok(vec![]);
562 }
563
564 Ok(cmds)
565 }
566 Err(AggregatorError::NotEnoughShares) => Ok(vec![]),
567 error => {
568 warn!(
569 "Relocation: Error received as part of signature aggregation during join: {:?}",
570 error
571 );
572 Ok(vec![])
573 }
574 }
575 }
576 _ => {
577 debug!(
578 "Relocation: Ignoring unexpected join response message: {:?}",
579 join_response
580 );
581 Ok(vec![])
582 }
583 }
584 }
585 SystemMsg::DkgFailureAgreement(sig_set) => {
586 trace!("Handling msg: Dkg-FailureAgreement from {}", sender);
587 self.handle_dkg_failure_agreement(&src_name, &sig_set).await
588 }
589 SystemMsg::JoinRequest(join_request) => {
590 trace!("Handling msg: JoinRequest from {}", sender);
591 self.handle_join_request(sender, *join_request).await
592 }
593 SystemMsg::JoinAsRelocatedRequest(join_request) => {
594 trace!("Handling msg: JoinAsRelocatedRequest from {}", sender);
595 if self.is_not_elder().await
596 && join_request.section_key == self.network_knowledge.section_key().await
597 {
598 return Ok(vec![]);
599 }
600
601 self.handle_join_as_relocated_request(sender, *join_request, known_keys)
602 .await
603 }
604 SystemMsg::Propose {
605 proposal,
606 sig_share,
607 } => {
608 if self.is_not_elder().await {
609 trace!("Adult handling a Propose msg from {}: {:?}", sender, msg_id);
610 }
611
612 trace!("Handling msg: Propose from {}: {:?}", sender, msg_id);
613
614 let core_proposal = match proposal {
616 ProposalMsg::Offline(node_state) => {
617 CoreProposal::Offline(node_state.into_state())
618 }
619 ProposalMsg::SectionInfo(sap) => CoreProposal::SectionInfo(sap.into_state()),
620 ProposalMsg::NewElders(sap) => CoreProposal::NewElders(sap.into_authed_state()),
621 ProposalMsg::JoinsAllowed(allowed) => CoreProposal::JoinsAllowed(allowed),
622 };
623
624 handle_proposal(
625 msg_id,
626 core_proposal,
627 sig_share,
628 sender,
629 &self.network_knowledge,
630 &self.proposal_aggregator,
631 )
632 .await
633 }
634 SystemMsg::DkgStart {
635 session_id,
636 prefix,
637 elders,
638 } => {
639 trace!("Handling msg: Dkg-Start {:?} from {}", session_id, sender);
640 if !elders.contains_key(&self.info.read().await.name()) {
641 return Ok(vec![]);
642 }
643 if let NodeMsgAuthority::Section(authority) = msg_authority {
644 let _existing = self.dkg_sessions.write().await.insert(
645 session_id,
646 DkgSessionInfo {
647 prefix,
648 elders: elders.clone(),
649 authority,
650 },
651 );
652 }
653 self.handle_dkg_start(session_id, prefix, elders).await
654 }
655 SystemMsg::DkgMessage {
656 session_id,
657 message,
658 } => {
659 trace!(
660 "Handling msg: Dkg-Msg ({:?} - {:?}) from {}",
661 session_id,
662 message,
663 sender
664 );
665 self.handle_dkg_msg(session_id, message, sender).await
666 }
667 SystemMsg::DkgFailureObservation {
668 session_id,
669 sig,
670 failed_participants,
671 } => {
672 trace!("Handling msg: Dkg-FailureObservation from {}", sender);
673 self.handle_dkg_failure_observation(session_id, &failed_participants, sig)
674 }
675 SystemMsg::DkgNotReady {
676 message,
677 session_id,
678 } => {
679 self.handle_dkg_not_ready(
680 sender,
681 message,
682 session_id,
683 self.network_knowledge.section_key().await,
684 )
685 .await
686 }
687 SystemMsg::DkgRetry {
688 message_history,
689 message,
690 session_id,
691 } => {
692 self.handle_dkg_retry(session_id, message_history, message, sender)
693 .await
694 }
695 SystemMsg::NodeCmd(NodeCmd::RecordStorageLevel { node_id, level, .. }) => {
696 let changed = self.set_storage_level(&node_id, level).await;
697 if changed && level.value() == MIN_LEVEL_WHEN_FULL {
698 *self.joins_allowed.write().await = true;
700 }
701 Ok(vec![])
702 }
703 SystemMsg::NodeCmd(NodeCmd::ReceiveMetadata { metadata }) => {
704 info!("Processing received MetadataExchange packet: {:?}", msg_id);
705 self.set_adult_levels(metadata).await;
706 Ok(vec![])
707 }
708 SystemMsg::NodeEvent(NodeEvent::CouldNotStoreData {
709 node_id,
710 data,
711 full,
712 }) => {
713 info!(
714 "Processing CouldNotStoreData event with MsgId: {:?}",
715 msg_id
716 );
717 return if self.is_elder().await {
718 if full {
719 let changed = self
720 .set_storage_level(&node_id, StorageLevel::from(StorageLevel::MAX)?)
721 .await;
722 if changed {
723 *self.joins_allowed.write().await = true;
725 }
726 }
727 self.replicate_data(data).await
728 } else {
729 error!("Received unexpected message while Adult");
730 Ok(vec![])
731 };
732 }
733 SystemMsg::NodeEvent(NodeEvent::SuspiciousNodesDetected(suspects)) => {
734 info!(
735 "Received probable suspects nodes {suspects:?} Starting preemptive data replication"
736 );
737 debug!("{}", LogMarker::DeviantsDetected);
738
739 return self.replicate_data_of_suspicious_nodes(suspects).await;
740 }
741 SystemMsg::NodeCmd(NodeCmd::ReplicateData(data_collection)) => {
742 info!("ReplicateData MsgId: {:?}", msg_id);
743 return if self.is_elder().await {
744 error!("Received unexpected message while Elder");
745 Ok(vec![])
746 } else {
747 let mut cmds = vec![];
748
749 for data in data_collection {
750 match self.data_storage.store(&data).await {
754 Ok(level_report) => {
755 info!("Storage level report: {:?}", level_report);
756 cmds.extend(self.record_storage_level_if_any(level_report).await);
757 }
758 Err(error) => {
759 match error {
760 DbError::NotEnoughSpace => {
761 error!("Not enough space to store more data");
763
764 let node_id =
765 PublicKey::from(self.info.read().await.keypair.public);
766 let msg =
767 SystemMsg::NodeEvent(NodeEvent::CouldNotStoreData {
768 node_id,
769 data,
770 full: true,
771 });
772
773 cmds.push(self.send_msg_to_our_elders(msg).await?)
774 }
775 _ => {
776 error!("Problem storing data, but it was ignored: {error}");
777 } }
779 }
780 }
781 }
782
783 Ok(cmds)
784 };
785 }
786 SystemMsg::NodeCmd(NodeCmd::SendReplicateDataAddress(data_addresses)) => {
787 info!("ReplicateData MsgId: {:?}", msg_id);
788
789 return if self.is_elder().await {
790 error!("Received unexpected message while Elder");
791 Ok(vec![])
792 } else {
793 let mut data_not_present = vec![];
795
796 for data_address in data_addresses {
797 match self.data_storage.get_from_local_store(&data_address).await {
800 Err(crate::dbs::Error::NoSuchData(_))
801 | Err(crate::dbs::Error::ChunkNotFound(_)) => {
802 info!("to-be-replicated data is not present");
803
804 data_not_present.push(data_address);
806 }
807 Ok(_) => {
808 info!("We already have the data that was asked to be replicated");
809 }
810 Err(e) => {
811 error!("Error Sending FetchReplicateData for replication: {e}");
812 return Ok(vec![]);
813 }
814 }
815 }
816
817 let section_pk = self.section_key_by_name(&sender.name()).await;
818 let src_section_pk = self.network_knowledge().section_key().await;
819 let our_info = &*self.info.read().await;
820 let dst = sn_interface::messaging::DstLocation::Node {
821 name: sender.name(),
822 section_pk,
823 };
824 let mut wire_msgs = vec![];
825
826 for chunked_data_address in
829 &data_not_present.into_iter().chunks(REPLICATION_BATCH_SIZE)
830 {
831 let system_msg = SystemMsg::NodeCmd(NodeCmd::FetchReplicateData(
832 chunked_data_address.collect_vec(),
833 ));
834
835 wire_msgs.push(WireMsg::single_src(
836 our_info,
837 dst,
838 system_msg,
839 src_section_pk,
840 )?);
841 }
842
843 let cmd = Cmd::ThrottledSendBatchMsgs {
844 throttle_duration: REPLICATION_MSG_THROTTLE_DURATION,
845 recipients: vec![sender],
846 wire_msgs,
847 };
848
849 Ok(vec![cmd])
850 };
851 }
852 SystemMsg::NodeCmd(NodeCmd::FetchReplicateData(data_addresses)) => {
853 let mut cmds = vec![];
854 info!("FetchReplicateData MsgId: {:?}", msg_id);
855 return if self.is_elder().await {
856 error!("Received unexpected message while Elder");
857 Ok(vec![])
858 } else {
859 let mut addresses = vec![];
861 for chunked_data_address in
862 &data_addresses.into_iter().chunks(REPLICATION_BATCH_SIZE)
863 {
864 let data_address_collection = chunked_data_address.collect_vec();
865 addresses.push(data_address_collection);
866 }
867
868 for chunked_addresses in addresses {
870 let mut data_collection = vec![];
871 for data_address in chunked_addresses {
872 match self.data_storage.get_for_replication(data_address).await {
873 Ok(data) => {
874 info!("Providing {data_address:?} for replication");
875
876 data_collection.push(data);
877 }
878 Err(e) => {
879 warn!("Error providing data for replication: {e}");
880 return Ok(vec![]);
881 }
882 }
883 }
884
885 cmds.push(Cmd::SignOutgoingSystemMsg {
886 msg: SystemMsg::NodeCmd(NodeCmd::ReplicateData(data_collection)),
887 dst: sn_interface::messaging::DstLocation::Node {
888 name: sender.name(),
889 section_pk: self.section_key_by_name(&sender.name()).await,
890 },
891 });
892 }
893
894 Ok(cmds)
896 };
897 }
898 SystemMsg::NodeCmd(node_cmd) => {
899 self.send_event(Event::MessageReceived {
900 msg_id,
901 src: msg_authority.src_location(),
902 dst: dst_location,
903 msg: Box::new(MessageReceived::NodeCmd(node_cmd)),
904 })
905 .await;
906
907 Ok(vec![])
908 }
909 SystemMsg::NodeQuery(node_query) => {
910 match node_query {
911 NodeQuery::Data {
913 query,
914 auth,
915 origin,
916 correlation_id,
917 } => {
918 let sender_xorname = msg_authority.get_auth_xorname();
921 self.handle_data_query_at_adult(
922 correlation_id,
923 &query,
924 auth,
925 origin,
926 sender_xorname,
927 )
928 .await
929 }
930 _ => {
931 self.send_event(Event::MessageReceived {
932 msg_id,
933 src: msg_authority.src_location(),
934 dst: dst_location,
935 msg: Box::new(MessageReceived::NodeQuery(node_query)),
936 })
937 .await;
938 Ok(vec![])
939 }
940 }
941 }
942 SystemMsg::NodeQueryResponse {
943 response,
944 correlation_id,
945 user,
946 } => {
947 debug!(
948 "{:?}: op_id {:?}, correlation_id: {correlation_id:?}, sender: {sender}",
949 LogMarker::ChunkQueryResponseReceviedFromAdult,
950 response.operation_id()?
951 );
952 let sending_nodes_pk = match msg_authority {
953 NodeMsgAuthority::Node(auth) => PublicKey::from(auth.into_inner().node_ed_pk),
954 _ => return Err(Error::InvalidQueryResponseAuthority),
955 };
956
957 self.handle_data_query_response_at_elder(
958 correlation_id,
959 response,
960 user,
961 sending_nodes_pk,
962 )
963 .await
964 }
965 SystemMsg::DkgSessionUnknown {
966 session_id,
967 message,
968 } => {
969 if let Some(session_info) = self.dkg_sessions.read().await.get(&session_id).cloned()
970 {
971 let DkgSessionInfo {
972 prefix,
973 elders,
974 authority: section_auth,
975 } = session_info;
976 let message_cache = self.dkg_voter.get_cached_msgs(&session_id);
977 trace!(
978 "Sending DkgSessionInfo {{ {:?}, elders {:?}, ... }} to {}",
979 &session_id,
980 elders,
981 &sender
982 );
983
984 let node_msg = SystemMsg::DkgSessionInfo {
985 session_id,
986 elders,
987 prefix,
988 section_auth,
989 message_cache,
990 message,
991 };
992 let section_pk = self.network_knowledge.section_key().await;
993 let wire_msg = WireMsg::single_src(
994 &self.info.read().await.clone(),
995 DstLocation::Node {
996 name: sender.name(),
997 section_pk,
998 },
999 node_msg,
1000 section_pk,
1001 )?;
1002
1003 Ok(vec![Cmd::SendMsg {
1004 recipients: vec![sender],
1005 wire_msg,
1006 }])
1007 } else {
1008 warn!("Unknown DkgSessionInfo: {:?} requested", &session_id);
1009 Ok(vec![])
1010 }
1011 }
1012 SystemMsg::DkgSessionInfo {
1013 session_id,
1014 prefix,
1015 elders,
1016 message_cache,
1017 section_auth,
1018 message,
1019 } => {
1020 let mut cmds = vec![];
1021 let payload = WireMsg::serialize_msg_payload(&SystemMsg::DkgStart {
1023 session_id,
1024 prefix,
1025 elders: elders.clone(),
1026 })?;
1027 let auth = section_auth.clone().into_inner();
1028 if self.network_knowledge.section_key().await == auth.sig.public_key {
1029 if let Err(err) = AuthorityProof::verify(auth, payload) {
1030 error!("Error verifying signature for DkgSessionInfo: {:?}", err);
1031 return Ok(cmds);
1032 } else {
1033 trace!("DkgSessionInfo signature verified");
1034 }
1035 } else {
1036 warn!(
1037 "Cannot verify DkgSessionInfo: {:?}. Unknown key: {:?}!",
1038 &session_id, auth.sig.public_key
1039 );
1040 let chain = self.network_knowledge().section_chain().await;
1041 warn!("Chain: {:?}", chain);
1042 return Ok(cmds);
1043 }
1044 let _existing = self.dkg_sessions.write().await.insert(
1045 session_id,
1046 DkgSessionInfo {
1047 prefix,
1048 elders: elders.clone(),
1049 authority: section_auth,
1050 },
1051 );
1052 trace!("DkgSessionInfo handling {:?} - {:?}", session_id, elders);
1053 cmds.extend(self.handle_dkg_start(session_id, prefix, elders).await?);
1054 cmds.extend(
1055 self.handle_dkg_retry(session_id, message_cache, message, sender)
1056 .await?,
1057 );
1058 Ok(cmds)
1059 }
1060 }
1061 }
1062
1063 async fn record_storage_level_if_any(&self, level: Option<StorageLevel>) -> Vec<Cmd> {
1064 let mut cmds = vec![];
1065 if let Some(level) = level {
1066 info!("Storage has now passed {} % used.", 10 * level.value());
1067 let node_id = PublicKey::from(self.info.read().await.keypair.public);
1068 let node_xorname = XorName::from(node_id);
1069
1070 let msg = SystemMsg::NodeCmd(NodeCmd::RecordStorageLevel {
1072 section: node_xorname,
1073 node_id,
1074 level,
1075 });
1076
1077 let dst = DstLocation::Section {
1078 name: node_xorname,
1079 section_pk: self.network_knowledge.section_key().await,
1080 };
1081
1082 cmds.push(Cmd::SignOutgoingSystemMsg { msg, dst });
1083 }
1084 cmds
1085 }
1086
1087 async fn replicate_data_of_suspicious_nodes(
1088 &self,
1089 suspects: BTreeSet<XorName>,
1090 ) -> Result<Vec<Cmd>> {
1091 debug!("Replicating data of suspect nodes : {suspects:?}");
1092 let our_adults = self
1093 .network_knowledge
1094 .adults()
1095 .await
1096 .iter()
1097 .filter_map(|peer| {
1098 if suspects.contains(&peer.name()) {
1099 None
1100 } else {
1101 Some(peer.name())
1102 }
1103 })
1104 .collect::<BTreeSet<XorName>>();
1105
1106 self.reorganize_data(BTreeSet::new(), suspects, our_adults)
1107 .await
1108 .map_err(crate::node::Error::from)
1109 }
1110
1111 async fn aggregate_msg_and_stop(
1115 &self,
1116 msg_authority: &mut NodeMsgAuthority,
1117 payload: Bytes,
1118 ) -> Result<bool> {
1119 let bls_share_auth = if let NodeMsgAuthority::BlsShare(bls_share_auth) = msg_authority {
1120 bls_share_auth
1121 } else {
1122 return Ok(false);
1123 };
1124
1125 match SectionAuth::try_authorize(
1126 self.message_aggregator.clone(),
1127 bls_share_auth.clone().into_inner(),
1128 &payload,
1129 )
1130 .await
1131 {
1132 Ok(section_auth) => {
1133 info!("Successfully aggregated message");
1134 *msg_authority = NodeMsgAuthority::Section(section_auth);
1135 Ok(false)
1136 }
1137 Err(AggregatorError::NotEnoughShares) => {
1138 info!("Not enough shares to aggregate received message");
1139 Ok(true)
1140 }
1141 Err(err) => {
1142 error!("Error accumulating message at dst: {:?}", err);
1143 Err(Error::InvalidSignatureShare)
1144 }
1145 }
1146 }
1147}