safe_network/node/core/messaging/handling/
mod.rs

1// Copyright 2022 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9mod agreement;
10mod anti_entropy;
11mod dkg;
12mod join;
13mod proposals;
14mod relocation;
15mod resource_proof;
16mod service_msgs;
17mod update_section;
18
19pub(crate) use proposals::handle_proposal;
20
21use crate::dbs::Error as DbError;
22use crate::node::{
23    api::cmds::Cmd,
24    core::{DkgSessionInfo, Node, Proposal as CoreProposal, DATA_QUERY_LIMIT},
25    messages::{NodeMsgAuthorityUtils, WireMsgUtils},
26    Error, Event, MessageReceived, Result, MIN_LEVEL_WHEN_FULL,
27};
28use sn_interface::messaging::{
29    data::{ServiceMsg, StorageLevel},
30    signature_aggregator::Error as AggregatorError,
31    system::{
32        JoinRequest, JoinResponse, NodeCmd, NodeEvent, NodeQuery, Proposal as ProposalMsg,
33        SectionAuth as SystemSectionAuth, SystemMsg,
34    },
35    AuthorityProof, DstLocation, MsgId, MsgType, NodeMsgAuthority, SectionAuth, WireMsg,
36};
37use sn_interface::network_knowledge::NetworkKnowledge;
38use sn_interface::types::{log_markers::LogMarker, Peer, PublicKey};
39
40use bls::PublicKey as BlsPublicKey;
41use bytes::Bytes;
42use itertools::Itertools;
43use std::collections::BTreeSet;
44use tokio::time::Duration;
45use xor_name::XorName;
46
47const REPLICATION_BATCH_SIZE: usize = 50;
48const REPLICATION_MSG_THROTTLE_DURATION: Duration = Duration::from_secs(5);
49
50// Message handling
51impl Node {
52    #[instrument(skip(self, original_bytes))]
53    pub(crate) async fn handle_msg(
54        &self,
55        sender: Peer,
56        wire_msg: WireMsg,
57        original_bytes: Option<Bytes>,
58    ) -> Result<Vec<Cmd>> {
59        let mut cmds = vec![];
60
61        // Apply backpressure if needed.
62        if let Some(load_report) = self.comm.tolerated_msgs_per_s(&sender).await {
63            let msg_src = wire_msg.msg_kind().src();
64            if !msg_src.is_end_user() {
65                trace!("Sending BackPressure: {}", load_report);
66                let src_section_pk = self.network_knowledge().section_key().await;
67
68                cmds.push(Cmd::SendMsg {
69                    wire_msg: WireMsg::single_src(
70                        &*self.info.read().await,
71                        msg_src.to_dst(),
72                        SystemMsg::BackPressure(load_report),
73                        src_section_pk,
74                    )?,
75                    recipients: vec![sender],
76                })
77            }
78        }
79
80        // Deserialize the payload of the incoming message
81        let msg_id = wire_msg.msg_id();
82        // payload needed for aggregation
83        let payload = wire_msg.payload.clone();
84
85        let message_type = match wire_msg.into_msg() {
86            Ok(message_type) => message_type,
87            Err(error) => {
88                error!(
89                    "Failed to deserialize message payload ({:?}): {:?}",
90                    msg_id, error
91                );
92                return Ok(cmds);
93            }
94        };
95
96        match message_type {
97            MsgType::System {
98                msg_id,
99                msg_authority,
100                dst_location,
101                msg,
102            } => {
103                // Let's now verify the section key in the msg authority is trusted
104                // based on our current knowledge of the network and sections chains.
105                let mut known_keys: Vec<BlsPublicKey> = self
106                    .network_knowledge
107                    .section_chain()
108                    .await
109                    .keys()
110                    .copied()
111                    .collect();
112                known_keys.extend(self.network_knowledge.prefix_map().section_keys());
113                known_keys.push(*self.network_knowledge.genesis_key());
114
115                if !Self::verify_msg_can_be_trusted(msg_authority.clone(), msg.clone(), &known_keys)
116                {
117                    warn!(
118                        "Untrusted message ({:?}) dropped from {:?}: {:?} ",
119                        msg_id, sender, msg
120                    );
121                    return Ok(cmds);
122                }
123
124                // Let's check for entropy before we proceed further
125                // Adult nodes don't need to carry out entropy checking,
126                // however the message shall always be handled.
127                if self.is_elder().await {
128                    // For the case of receiving a join request not matching our prefix,
129                    // we just let the join request handler to deal with it later on.
130                    // We also skip AE check on Anti-Entropy messages
131                    //
132                    // TODO: consider changing the join and "join as relocated" flows to
133                    // make use of AntiEntropy retry/redirect responses.
134                    match msg {
135                        SystemMsg::AntiEntropyRetry { .. }
136                        | SystemMsg::AntiEntropyUpdate { .. }
137                        | SystemMsg::AntiEntropyRedirect { .. }
138                        | SystemMsg::JoinRequest(_)
139                        | SystemMsg::JoinAsRelocatedRequest(_) => {
140                            trace!(
141                                "Entropy check skipped for {:?}, handling message directly",
142                                msg_id
143                            );
144                        }
145                        _ => match dst_location.section_pk() {
146                            None => {}
147                            Some(dst_section_pk) => {
148                                let msg_bytes = original_bytes.unwrap_or(wire_msg.serialize()?);
149
150                                if let Some(ae_cmd) = self
151                                    .check_for_entropy(
152                                        // a cheap clone w/ Bytes
153                                        msg_bytes,
154                                        &msg_authority.src_location(),
155                                        &dst_section_pk,
156                                        dst_location.name(),
157                                        &sender,
158                                    )
159                                    .await?
160                                {
161                                    // we want to log issues with an elder who is out of sync here...
162                                    let knowledge = self.network_knowledge.elders().await;
163                                    let mut known_elders = knowledge.iter().map(|peer| peer.name());
164
165                                    if known_elders.contains(&sender.name()) {
166                                        // we track a dysfunction against our elder here
167                                        self.dysfunction_tracking
168                                            .track_knowledge_issue(sender.name())
169                                            .await
170                                            .map_err(Error::from)?;
171                                    }
172
173                                    // short circuit and send those AE responses
174                                    cmds.push(ae_cmd);
175                                    return Ok(cmds);
176                                }
177
178                                trace!("Entropy check passed. Handling verified msg {:?}", msg_id);
179                            }
180                        },
181                    }
182                }
183
184                let handling_msg_cmds = self
185                    .handle_system_msg(
186                        sender,
187                        msg_id,
188                        msg_authority,
189                        dst_location,
190                        msg,
191                        payload,
192                        known_keys,
193                    )
194                    .await?;
195
196                cmds.extend(handling_msg_cmds);
197
198                Ok(cmds)
199            }
200            MsgType::Service {
201                msg_id,
202                msg,
203                dst_location,
204                auth,
205            } => {
206                let dst_name = match msg.dst_address() {
207                    Some(name) => name,
208                    None => {
209                        error!(
210                            "Service msg has been dropped since {:?} is not a valid msg to send from a client {}.",
211                            msg, sender.addr()
212                        );
213                        return Ok(vec![]);
214                    }
215                };
216
217                let src_location = wire_msg.msg_kind().src();
218
219                if self.is_not_elder().await {
220                    trace!("Redirecting from adult to section elders");
221                    cmds.push(
222                        self.ae_redirect_to_our_elders(sender, &src_location, &wire_msg)
223                            .await?,
224                    );
225                    return Ok(cmds);
226                }
227
228                // First we check if it's query and we have too many on the go at the moment...
229                if let ServiceMsg::Query(_) = msg {
230                    // we have a query, check if we have too many on the go....
231                    let pending_query_length = self.pending_data_queries.len().await;
232
233                    if pending_query_length > DATA_QUERY_LIMIT {
234                        // TODO: check if query is pending for this already.. add to that if that makes sense.
235                        warn!("Pending queries length exceeded, dropping query {msg:?}");
236                        return Ok(vec![]);
237                    }
238                }
239
240                // Then we perform AE checks
241                let received_section_pk = match dst_location.section_pk() {
242                    Some(section_pk) => section_pk,
243                    None => {
244                        warn!("Dropping service message as there is no valid dst section_pk.");
245                        return Ok(cmds);
246                    }
247                };
248
249                let msg_bytes = original_bytes.unwrap_or(wire_msg.serialize()?);
250                if let Some(cmd) = self
251                    .check_for_entropy(
252                        // a cheap clone w/ Bytes
253                        msg_bytes,
254                        &src_location,
255                        &received_section_pk,
256                        dst_name,
257                        &sender,
258                    )
259                    .await?
260                {
261                    // short circuit and send those AE responses
262                    cmds.push(cmd);
263                    return Ok(cmds);
264                }
265
266                cmds.extend(
267                    self.handle_service_msg(msg_id, msg, dst_location, auth, sender)
268                        .await?,
269                );
270
271                Ok(cmds)
272            }
273        }
274    }
275
276    // Handler for all system messages
277    #[allow(clippy::too_many_arguments)]
278    pub(crate) async fn handle_system_msg(
279        &self,
280        sender: Peer,
281        msg_id: MsgId,
282        mut msg_authority: NodeMsgAuthority,
283        dst_location: DstLocation,
284        msg: SystemMsg,
285        payload: Bytes,
286        known_keys: Vec<BlsPublicKey>,
287    ) -> Result<Vec<Cmd>> {
288        trace!("{:?}", LogMarker::SystemMsgToBeHandled);
289
290        // We assume to be aggregated if it contains a BLS Share sig as authority.
291        match self
292            .aggregate_msg_and_stop(&mut msg_authority, payload)
293            .await
294        {
295            Ok(false) => {
296                self.handle_valid_msg(msg_id, msg_authority, dst_location, msg, sender, known_keys)
297                    .await
298            }
299            Err(Error::InvalidSignatureShare) => {
300                warn!(
301                    "Invalid signature on received system message, dropping the message: {:?}",
302                    msg_id
303                );
304                Ok(vec![])
305            }
306            Ok(true) => Ok(vec![]),
307            Err(err) => {
308                trace!("handle_system_msg got error {:?}", err);
309                Ok(vec![])
310            }
311        }
312    }
313
314    // Handler for data messages which have successfully
315    // passed all signature checks and msg verifications
316    pub(crate) async fn handle_valid_msg(
317        &self,
318        msg_id: MsgId,
319        msg_authority: NodeMsgAuthority,
320        dst_location: DstLocation,
321        node_msg: SystemMsg,
322        sender: Peer,
323        known_keys: Vec<BlsPublicKey>,
324    ) -> Result<Vec<Cmd>> {
325        let src_name = msg_authority.name();
326        match node_msg {
327            SystemMsg::AntiEntropyUpdate {
328                section_auth,
329                section_signed,
330                proof_chain,
331                members,
332            } => {
333                trace!("Handling msg: AE-Update from {}: {:?}", sender, msg_id,);
334                self.handle_anti_entropy_update_msg(
335                    section_auth.into_state(),
336                    section_signed,
337                    proof_chain,
338                    members,
339                )
340                .await
341            }
342            SystemMsg::Relocate(node_state) => {
343                trace!("Handling msg: Relocate from {}: {:?}", sender, msg_id);
344                Ok(self
345                    .handle_relocate(node_state)
346                    .await?
347                    .into_iter()
348                    .collect())
349            }
350            SystemMsg::StartConnectivityTest(name) => {
351                trace!(
352                    "Handling msg: StartConnectivityTest from {}: {:?}",
353                    sender,
354                    msg_id
355                );
356                if self.is_not_elder().await {
357                    return Ok(vec![]);
358                }
359
360                Ok(vec![Cmd::TestConnectivity(name)])
361            }
362            SystemMsg::JoinAsRelocatedResponse(join_response) => {
363                trace!("Handling msg: JoinAsRelocatedResponse from {}", sender);
364                if let Some(ref mut joining_as_relocated) = *self.relocate_state.write().await {
365                    if let Some(cmd) = joining_as_relocated
366                        .handle_join_response(*join_response, sender.addr())
367                        .await?
368                    {
369                        return Ok(vec![cmd]);
370                    }
371                } else {
372                    error!(
373                        "No relocation in progress upon receiving {:?}",
374                        join_response
375                    );
376                }
377
378                Ok(vec![])
379            }
380            SystemMsg::NodeMsgError {
381                error,
382                correlation_id,
383            } => {
384                trace!(
385                    "From {:?}({:?}), received error {:?} correlated to {:?}",
386                    msg_authority.src_location(),
387                    msg_id,
388                    error,
389                    correlation_id
390                );
391                Ok(vec![])
392            }
393            SystemMsg::AntiEntropyRetry {
394                section_auth,
395                section_signed,
396                proof_chain,
397                bounced_msg,
398            } => {
399                trace!("Handling msg: AE-Retry from {}: {:?}", sender, msg_id,);
400                self.handle_anti_entropy_retry_msg(
401                    section_auth.into_state(),
402                    section_signed,
403                    proof_chain,
404                    bounced_msg,
405                    sender,
406                )
407                .await
408            }
409            SystemMsg::AntiEntropyRedirect {
410                section_auth,
411                section_signed,
412                section_chain,
413                bounced_msg,
414            } => {
415                trace!("Handling msg: AE-Redirect from {}: {:?}", sender, msg_id);
416                self.handle_anti_entropy_redirect_msg(
417                    section_auth.into_state(),
418                    section_signed,
419                    section_chain,
420                    bounced_msg,
421                    sender,
422                )
423                .await
424            }
425            SystemMsg::AntiEntropyProbe(_dst) => {
426                trace!("Received Probe message from {}: {:?}", sender, msg_id);
427                Ok(vec![])
428            }
429            SystemMsg::BackPressure(msgs_per_s) => {
430                trace!(
431                    "Handling msg: BackPressure with requested {} msgs/s, from {}: {:?}",
432                    msgs_per_s,
433                    sender,
434                    msg_id
435                );
436                // TODO: Factor in med/long term backpressure into general node liveness calculations
437                self.comm.regulate(&sender, msgs_per_s).await;
438                Ok(vec![])
439            }
440            // The AcceptedOnlineShare for relocation will be received here.
441            SystemMsg::JoinResponse(join_response) => {
442                match *join_response {
443                    JoinResponse::ApprovalShare {
444                        node_state,
445                        sig_share,
446                        section_chain,
447                        members,
448                        ..
449                    } => {
450                        let serialized_details = bincode::serialize(&node_state)?;
451
452                        info!(
453                            "Relocation: Aggregating received ApprovalShare from {:?}",
454                            sender
455                        );
456                        match self
457                            .proposal_aggregator
458                            .add(&serialized_details, sig_share.clone())
459                            .await
460                        {
461                            Ok(sig) => {
462                                info!("Relocation: Successfully aggregated ApprovalShares for joining the network");
463                                let mut cmds = vec![];
464
465                                if let Some(ref mut joining_as_relocated) =
466                                    *self.relocate_state.write().await
467                                {
468                                    let new_node = joining_as_relocated.node.clone();
469                                    let new_name = new_node.name();
470                                    let previous_name = self.info.read().await.name();
471                                    let new_keypair = new_node.keypair.clone();
472
473                                    info!(
474                                        "Relocation: switching from {:?} to {:?}",
475                                        previous_name, new_name
476                                    );
477
478                                    let genesis_key = *self.network_knowledge.genesis_key();
479                                    let prefix_map = self.network_knowledge.prefix_map().clone();
480
481                                    let (recipients, signed_sap) = if let Ok(sap) =
482                                        self.network_knowledge.section_by_name(&new_name)
483                                    {
484                                        if let Some(signed_sap) =
485                                            prefix_map.get_signed(&sap.prefix())
486                                        {
487                                            (sap.elders().cloned().collect(), signed_sap)
488                                        } else {
489                                            warn!(
490                                                "Relocation: cannot find signed_sap for {:?}",
491                                                sap.prefix()
492                                            );
493                                            return Ok(vec![]);
494                                        }
495                                    } else {
496                                        warn!("Relocation: cannot find recipients to send aggregated JoinApproval");
497                                        return Ok(vec![]);
498                                    };
499
500                                    let new_network_knowledge = NetworkKnowledge::new(
501                                        genesis_key,
502                                        section_chain,
503                                        signed_sap,
504                                        Some(prefix_map),
505                                    )?;
506                                    let _ = new_network_knowledge.merge_members(
507                                        members
508                                            .into_iter()
509                                            .map(|member| member.into_authed_state())
510                                            .collect(),
511                                    );
512
513                                    // TODO: confirm whether carry out the switch immediately here
514                                    //       or still using the cmd pattern.
515                                    //       As the sending of the JoinRequest as notification
516                                    //       may require the `node` to be switched to new already.
517
518                                    self.relocate(new_node, new_network_knowledge).await?;
519
520                                    let section_key = sig_share.public_key_set.public_key();
521                                    let auth = SystemSectionAuth {
522                                        value: node_state,
523                                        sig,
524                                    };
525                                    let join_req = JoinRequest {
526                                        section_key,
527                                        resource_proof_response: None,
528                                        aggregated: Some(auth),
529                                    };
530
531                                    trace!(
532                                        "Relocation: Sending aggregated JoinRequest to {:?}",
533                                        recipients
534                                    );
535                                    // Resend the JoinRequest now that
536                                    // we have collected enough ApprovalShares from the Elders
537                                    let node_msg = SystemMsg::JoinRequest(Box::new(join_req));
538                                    let wire_msg = WireMsg::single_src(
539                                        &self.info.read().await.clone(),
540                                        DstLocation::Section {
541                                            name: new_name,
542                                            section_pk: section_key,
543                                        },
544                                        node_msg,
545                                        section_key,
546                                    )?;
547                                    cmds.push(Cmd::SendMsg {
548                                        recipients,
549                                        wire_msg,
550                                    });
551
552                                    self.send_event(Event::Relocated {
553                                        previous_name,
554                                        new_keypair,
555                                    })
556                                    .await;
557
558                                    trace!("{}", LogMarker::RelocateEnd);
559                                } else {
560                                    warn!("Relocation:  self.relocate_state is not in Progress");
561                                    return Ok(vec![]);
562                                }
563
564                                Ok(cmds)
565                            }
566                            Err(AggregatorError::NotEnoughShares) => Ok(vec![]),
567                            error => {
568                                warn!(
569                                    "Relocation: Error received as part of signature aggregation during join: {:?}",
570                                    error
571                                );
572                                Ok(vec![])
573                            }
574                        }
575                    }
576                    _ => {
577                        debug!(
578                            "Relocation: Ignoring unexpected join response message: {:?}",
579                            join_response
580                        );
581                        Ok(vec![])
582                    }
583                }
584            }
585            SystemMsg::DkgFailureAgreement(sig_set) => {
586                trace!("Handling msg: Dkg-FailureAgreement from {}", sender);
587                self.handle_dkg_failure_agreement(&src_name, &sig_set).await
588            }
589            SystemMsg::JoinRequest(join_request) => {
590                trace!("Handling msg: JoinRequest from {}", sender);
591                self.handle_join_request(sender, *join_request).await
592            }
593            SystemMsg::JoinAsRelocatedRequest(join_request) => {
594                trace!("Handling msg: JoinAsRelocatedRequest from {}", sender);
595                if self.is_not_elder().await
596                    && join_request.section_key == self.network_knowledge.section_key().await
597                {
598                    return Ok(vec![]);
599                }
600
601                self.handle_join_as_relocated_request(sender, *join_request, known_keys)
602                    .await
603            }
604            SystemMsg::Propose {
605                proposal,
606                sig_share,
607            } => {
608                if self.is_not_elder().await {
609                    trace!("Adult handling a Propose msg from {}: {:?}", sender, msg_id);
610                }
611
612                trace!("Handling msg: Propose from {}: {:?}", sender, msg_id);
613
614                // lets convert our message into a usable proposal for core
615                let core_proposal = match proposal {
616                    ProposalMsg::Offline(node_state) => {
617                        CoreProposal::Offline(node_state.into_state())
618                    }
619                    ProposalMsg::SectionInfo(sap) => CoreProposal::SectionInfo(sap.into_state()),
620                    ProposalMsg::NewElders(sap) => CoreProposal::NewElders(sap.into_authed_state()),
621                    ProposalMsg::JoinsAllowed(allowed) => CoreProposal::JoinsAllowed(allowed),
622                };
623
624                handle_proposal(
625                    msg_id,
626                    core_proposal,
627                    sig_share,
628                    sender,
629                    &self.network_knowledge,
630                    &self.proposal_aggregator,
631                )
632                .await
633            }
634            SystemMsg::DkgStart {
635                session_id,
636                prefix,
637                elders,
638            } => {
639                trace!("Handling msg: Dkg-Start {:?} from {}", session_id, sender);
640                if !elders.contains_key(&self.info.read().await.name()) {
641                    return Ok(vec![]);
642                }
643                if let NodeMsgAuthority::Section(authority) = msg_authority {
644                    let _existing = self.dkg_sessions.write().await.insert(
645                        session_id,
646                        DkgSessionInfo {
647                            prefix,
648                            elders: elders.clone(),
649                            authority,
650                        },
651                    );
652                }
653                self.handle_dkg_start(session_id, prefix, elders).await
654            }
655            SystemMsg::DkgMessage {
656                session_id,
657                message,
658            } => {
659                trace!(
660                    "Handling msg: Dkg-Msg ({:?} - {:?}) from {}",
661                    session_id,
662                    message,
663                    sender
664                );
665                self.handle_dkg_msg(session_id, message, sender).await
666            }
667            SystemMsg::DkgFailureObservation {
668                session_id,
669                sig,
670                failed_participants,
671            } => {
672                trace!("Handling msg: Dkg-FailureObservation from {}", sender);
673                self.handle_dkg_failure_observation(session_id, &failed_participants, sig)
674            }
675            SystemMsg::DkgNotReady {
676                message,
677                session_id,
678            } => {
679                self.handle_dkg_not_ready(
680                    sender,
681                    message,
682                    session_id,
683                    self.network_knowledge.section_key().await,
684                )
685                .await
686            }
687            SystemMsg::DkgRetry {
688                message_history,
689                message,
690                session_id,
691            } => {
692                self.handle_dkg_retry(session_id, message_history, message, sender)
693                    .await
694            }
695            SystemMsg::NodeCmd(NodeCmd::RecordStorageLevel { node_id, level, .. }) => {
696                let changed = self.set_storage_level(&node_id, level).await;
697                if changed && level.value() == MIN_LEVEL_WHEN_FULL {
698                    // ..then we accept a new node in place of the full node
699                    *self.joins_allowed.write().await = true;
700                }
701                Ok(vec![])
702            }
703            SystemMsg::NodeCmd(NodeCmd::ReceiveMetadata { metadata }) => {
704                info!("Processing received MetadataExchange packet: {:?}", msg_id);
705                self.set_adult_levels(metadata).await;
706                Ok(vec![])
707            }
708            SystemMsg::NodeEvent(NodeEvent::CouldNotStoreData {
709                node_id,
710                data,
711                full,
712            }) => {
713                info!(
714                    "Processing CouldNotStoreData event with MsgId: {:?}",
715                    msg_id
716                );
717                return if self.is_elder().await {
718                    if full {
719                        let changed = self
720                            .set_storage_level(&node_id, StorageLevel::from(StorageLevel::MAX)?)
721                            .await;
722                        if changed {
723                            // ..then we accept a new node in place of the full node
724                            *self.joins_allowed.write().await = true;
725                        }
726                    }
727                    self.replicate_data(data).await
728                } else {
729                    error!("Received unexpected message while Adult");
730                    Ok(vec![])
731                };
732            }
733            SystemMsg::NodeEvent(NodeEvent::SuspiciousNodesDetected(suspects)) => {
734                info!(
735                    "Received probable suspects nodes {suspects:?} Starting preemptive data replication"
736                );
737                debug!("{}", LogMarker::DeviantsDetected);
738
739                return self.replicate_data_of_suspicious_nodes(suspects).await;
740            }
741            SystemMsg::NodeCmd(NodeCmd::ReplicateData(data_collection)) => {
742                info!("ReplicateData MsgId: {:?}", msg_id);
743                return if self.is_elder().await {
744                    error!("Received unexpected message while Elder");
745                    Ok(vec![])
746                } else {
747                    let mut cmds = vec![];
748
749                    for data in data_collection {
750                        // We are an adult here, so just store away!
751                        // This may return a DatabaseFull error... but we should have reported storage increase
752                        // well before this
753                        match self.data_storage.store(&data).await {
754                            Ok(level_report) => {
755                                info!("Storage level report: {:?}", level_report);
756                                cmds.extend(self.record_storage_level_if_any(level_report).await);
757                            }
758                            Err(error) => {
759                                match error {
760                                    DbError::NotEnoughSpace => {
761                                        // db full
762                                        error!("Not enough space to store more data");
763
764                                        let node_id =
765                                            PublicKey::from(self.info.read().await.keypair.public);
766                                        let msg =
767                                            SystemMsg::NodeEvent(NodeEvent::CouldNotStoreData {
768                                                node_id,
769                                                data,
770                                                full: true,
771                                            });
772
773                                        cmds.push(self.send_msg_to_our_elders(msg).await?)
774                                    }
775                                    _ => {
776                                        error!("Problem storing data, but it was ignored: {error}");
777                                    } // the rest seem to be non-problematic errors.. (?)
778                                }
779                            }
780                        }
781                    }
782
783                    Ok(cmds)
784                };
785            }
786            SystemMsg::NodeCmd(NodeCmd::SendReplicateDataAddress(data_addresses)) => {
787                info!("ReplicateData MsgId: {:?}", msg_id);
788
789                return if self.is_elder().await {
790                    error!("Received unexpected message while Elder");
791                    Ok(vec![])
792                } else {
793                    // Collection of data addresses that we do not have
794                    let mut data_not_present = vec![];
795
796                    for data_address in data_addresses {
797                        // TODO: Check if the data name falls within our Xor namespace
798                        // Check if we already have the data
799                        match self.data_storage.get_from_local_store(&data_address).await {
800                            Err(crate::dbs::Error::NoSuchData(_))
801                            | Err(crate::dbs::Error::ChunkNotFound(_)) => {
802                                info!("to-be-replicated data is not present");
803
804                                // We do not have the data which we are supposed to have since the new reorg
805                                data_not_present.push(data_address);
806                            }
807                            Ok(_) => {
808                                info!("We already have the data that was asked to be replicated");
809                            }
810                            Err(e) => {
811                                error!("Error Sending FetchReplicateData for replication: {e}");
812                                return Ok(vec![]);
813                            }
814                        }
815                    }
816
817                    let section_pk = self.section_key_by_name(&sender.name()).await;
818                    let src_section_pk = self.network_knowledge().section_key().await;
819                    let our_info = &*self.info.read().await;
820                    let dst = sn_interface::messaging::DstLocation::Node {
821                        name: sender.name(),
822                        section_pk,
823                    };
824                    let mut wire_msgs = vec![];
825
826                    // Chunks the collection into REPLICATION_BATCH_SIZE addresses in a batch. This avoids memory
827                    // explosion in the network when the amount of data to be replicated is high
828                    for chunked_data_address in
829                        &data_not_present.into_iter().chunks(REPLICATION_BATCH_SIZE)
830                    {
831                        let system_msg = SystemMsg::NodeCmd(NodeCmd::FetchReplicateData(
832                            chunked_data_address.collect_vec(),
833                        ));
834
835                        wire_msgs.push(WireMsg::single_src(
836                            our_info,
837                            dst,
838                            system_msg,
839                            src_section_pk,
840                        )?);
841                    }
842
843                    let cmd = Cmd::ThrottledSendBatchMsgs {
844                        throttle_duration: REPLICATION_MSG_THROTTLE_DURATION,
845                        recipients: vec![sender],
846                        wire_msgs,
847                    };
848
849                    Ok(vec![cmd])
850                };
851            }
852            SystemMsg::NodeCmd(NodeCmd::FetchReplicateData(data_addresses)) => {
853                let mut cmds = vec![];
854                info!("FetchReplicateData MsgId: {:?}", msg_id);
855                return if self.is_elder().await {
856                    error!("Received unexpected message while Elder");
857                    Ok(vec![])
858                } else {
859                    // Chunk the full list into REPLICATION_BATCH_SIZE addresses a batch
860                    let mut addresses = vec![];
861                    for chunked_data_address in
862                        &data_addresses.into_iter().chunks(REPLICATION_BATCH_SIZE)
863                    {
864                        let data_address_collection = chunked_data_address.collect_vec();
865                        addresses.push(data_address_collection);
866                    }
867
868                    // Process each batch
869                    for chunked_addresses in addresses {
870                        let mut data_collection = vec![];
871                        for data_address in chunked_addresses {
872                            match self.data_storage.get_for_replication(data_address).await {
873                                Ok(data) => {
874                                    info!("Providing {data_address:?} for replication");
875
876                                    data_collection.push(data);
877                                }
878                                Err(e) => {
879                                    warn!("Error providing data for replication: {e}");
880                                    return Ok(vec![]);
881                                }
882                            }
883                        }
884
885                        cmds.push(Cmd::SignOutgoingSystemMsg {
886                            msg: SystemMsg::NodeCmd(NodeCmd::ReplicateData(data_collection)),
887                            dst: sn_interface::messaging::DstLocation::Node {
888                                name: sender.name(),
889                                section_pk: self.section_key_by_name(&sender.name()).await,
890                            },
891                        });
892                    }
893
894                    // Provide the requested data
895                    Ok(cmds)
896                };
897            }
898            SystemMsg::NodeCmd(node_cmd) => {
899                self.send_event(Event::MessageReceived {
900                    msg_id,
901                    src: msg_authority.src_location(),
902                    dst: dst_location,
903                    msg: Box::new(MessageReceived::NodeCmd(node_cmd)),
904                })
905                .await;
906
907                Ok(vec![])
908            }
909            SystemMsg::NodeQuery(node_query) => {
910                match node_query {
911                    // A request from EndUser - via elders - for locally stored data
912                    NodeQuery::Data {
913                        query,
914                        auth,
915                        origin,
916                        correlation_id,
917                    } => {
918                        // There is no point in verifying a sig from a sender A or B here.
919                        // Send back response to the sending elder
920                        let sender_xorname = msg_authority.get_auth_xorname();
921                        self.handle_data_query_at_adult(
922                            correlation_id,
923                            &query,
924                            auth,
925                            origin,
926                            sender_xorname,
927                        )
928                        .await
929                    }
930                    _ => {
931                        self.send_event(Event::MessageReceived {
932                            msg_id,
933                            src: msg_authority.src_location(),
934                            dst: dst_location,
935                            msg: Box::new(MessageReceived::NodeQuery(node_query)),
936                        })
937                        .await;
938                        Ok(vec![])
939                    }
940                }
941            }
942            SystemMsg::NodeQueryResponse {
943                response,
944                correlation_id,
945                user,
946            } => {
947                debug!(
948                    "{:?}: op_id {:?}, correlation_id: {correlation_id:?}, sender: {sender}",
949                    LogMarker::ChunkQueryResponseReceviedFromAdult,
950                    response.operation_id()?
951                );
952                let sending_nodes_pk = match msg_authority {
953                    NodeMsgAuthority::Node(auth) => PublicKey::from(auth.into_inner().node_ed_pk),
954                    _ => return Err(Error::InvalidQueryResponseAuthority),
955                };
956
957                self.handle_data_query_response_at_elder(
958                    correlation_id,
959                    response,
960                    user,
961                    sending_nodes_pk,
962                )
963                .await
964            }
965            SystemMsg::DkgSessionUnknown {
966                session_id,
967                message,
968            } => {
969                if let Some(session_info) = self.dkg_sessions.read().await.get(&session_id).cloned()
970                {
971                    let DkgSessionInfo {
972                        prefix,
973                        elders,
974                        authority: section_auth,
975                    } = session_info;
976                    let message_cache = self.dkg_voter.get_cached_msgs(&session_id);
977                    trace!(
978                        "Sending DkgSessionInfo {{ {:?}, elders {:?}, ... }} to {}",
979                        &session_id,
980                        elders,
981                        &sender
982                    );
983
984                    let node_msg = SystemMsg::DkgSessionInfo {
985                        session_id,
986                        elders,
987                        prefix,
988                        section_auth,
989                        message_cache,
990                        message,
991                    };
992                    let section_pk = self.network_knowledge.section_key().await;
993                    let wire_msg = WireMsg::single_src(
994                        &self.info.read().await.clone(),
995                        DstLocation::Node {
996                            name: sender.name(),
997                            section_pk,
998                        },
999                        node_msg,
1000                        section_pk,
1001                    )?;
1002
1003                    Ok(vec![Cmd::SendMsg {
1004                        recipients: vec![sender],
1005                        wire_msg,
1006                    }])
1007                } else {
1008                    warn!("Unknown DkgSessionInfo: {:?} requested", &session_id);
1009                    Ok(vec![])
1010                }
1011            }
1012            SystemMsg::DkgSessionInfo {
1013                session_id,
1014                prefix,
1015                elders,
1016                message_cache,
1017                section_auth,
1018                message,
1019            } => {
1020                let mut cmds = vec![];
1021                // Reconstruct the original DKG start message and verify the section signature
1022                let payload = WireMsg::serialize_msg_payload(&SystemMsg::DkgStart {
1023                    session_id,
1024                    prefix,
1025                    elders: elders.clone(),
1026                })?;
1027                let auth = section_auth.clone().into_inner();
1028                if self.network_knowledge.section_key().await == auth.sig.public_key {
1029                    if let Err(err) = AuthorityProof::verify(auth, payload) {
1030                        error!("Error verifying signature for DkgSessionInfo: {:?}", err);
1031                        return Ok(cmds);
1032                    } else {
1033                        trace!("DkgSessionInfo signature verified");
1034                    }
1035                } else {
1036                    warn!(
1037                        "Cannot verify DkgSessionInfo: {:?}. Unknown key: {:?}!",
1038                        &session_id, auth.sig.public_key
1039                    );
1040                    let chain = self.network_knowledge().section_chain().await;
1041                    warn!("Chain: {:?}", chain);
1042                    return Ok(cmds);
1043                }
1044                let _existing = self.dkg_sessions.write().await.insert(
1045                    session_id,
1046                    DkgSessionInfo {
1047                        prefix,
1048                        elders: elders.clone(),
1049                        authority: section_auth,
1050                    },
1051                );
1052                trace!("DkgSessionInfo handling {:?} - {:?}", session_id, elders);
1053                cmds.extend(self.handle_dkg_start(session_id, prefix, elders).await?);
1054                cmds.extend(
1055                    self.handle_dkg_retry(session_id, message_cache, message, sender)
1056                        .await?,
1057                );
1058                Ok(cmds)
1059            }
1060        }
1061    }
1062
1063    async fn record_storage_level_if_any(&self, level: Option<StorageLevel>) -> Vec<Cmd> {
1064        let mut cmds = vec![];
1065        if let Some(level) = level {
1066            info!("Storage has now passed {} % used.", 10 * level.value());
1067            let node_id = PublicKey::from(self.info.read().await.keypair.public);
1068            let node_xorname = XorName::from(node_id);
1069
1070            // we ask the section to record the new level reached
1071            let msg = SystemMsg::NodeCmd(NodeCmd::RecordStorageLevel {
1072                section: node_xorname,
1073                node_id,
1074                level,
1075            });
1076
1077            let dst = DstLocation::Section {
1078                name: node_xorname,
1079                section_pk: self.network_knowledge.section_key().await,
1080            };
1081
1082            cmds.push(Cmd::SignOutgoingSystemMsg { msg, dst });
1083        }
1084        cmds
1085    }
1086
1087    async fn replicate_data_of_suspicious_nodes(
1088        &self,
1089        suspects: BTreeSet<XorName>,
1090    ) -> Result<Vec<Cmd>> {
1091        debug!("Replicating data of suspect nodes : {suspects:?}");
1092        let our_adults = self
1093            .network_knowledge
1094            .adults()
1095            .await
1096            .iter()
1097            .filter_map(|peer| {
1098                if suspects.contains(&peer.name()) {
1099                    None
1100                } else {
1101                    Some(peer.name())
1102                }
1103            })
1104            .collect::<BTreeSet<XorName>>();
1105
1106        self.reorganize_data(BTreeSet::new(), suspects, our_adults)
1107            .await
1108            .map_err(crate::node::Error::from)
1109    }
1110
1111    // Convert the provided NodeMsgAuthority to be a `Section` message
1112    // authority on successful accumulation. Also return 'true' if
1113    // current message shall not be processed any further.
1114    async fn aggregate_msg_and_stop(
1115        &self,
1116        msg_authority: &mut NodeMsgAuthority,
1117        payload: Bytes,
1118    ) -> Result<bool> {
1119        let bls_share_auth = if let NodeMsgAuthority::BlsShare(bls_share_auth) = msg_authority {
1120            bls_share_auth
1121        } else {
1122            return Ok(false);
1123        };
1124
1125        match SectionAuth::try_authorize(
1126            self.message_aggregator.clone(),
1127            bls_share_auth.clone().into_inner(),
1128            &payload,
1129        )
1130        .await
1131        {
1132            Ok(section_auth) => {
1133                info!("Successfully aggregated message");
1134                *msg_authority = NodeMsgAuthority::Section(section_auth);
1135                Ok(false)
1136            }
1137            Err(AggregatorError::NotEnoughShares) => {
1138                info!("Not enough shares to aggregate received message");
1139                Ok(true)
1140            }
1141            Err(err) => {
1142                error!("Error accumulating message at dst: {:?}", err);
1143                Err(Error::InvalidSignatureShare)
1144            }
1145        }
1146    }
1147}